diff options
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/CommunicatorI.java | 28 | ||||
-rw-r--r-- | java/src/Ice/ObjectAdapterI.java | 388 | ||||
-rw-r--r-- | java/src/Ice/_ObjectDelM.java | 8 | ||||
-rw-r--r-- | java/src/IceInternal/Connection.java | 884 | ||||
-rw-r--r-- | java/src/IceInternal/Direct.java | 14 | ||||
-rw-r--r-- | java/src/IceInternal/Incoming.java | 168 | ||||
-rw-r--r-- | java/src/IceInternal/IncomingConnectionFactory.java | 161 | ||||
-rw-r--r-- | java/src/IceInternal/Instance.java | 138 | ||||
-rw-r--r-- | java/src/IceInternal/ObjectAdapterFactory.java | 87 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingConnectionFactory.java | 89 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 370 | ||||
-rw-r--r-- | java/src/IceInternal/TraceUtil.java | 9 |
12 files changed, 1180 insertions, 1164 deletions
diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java index 5fc21f804ba..16602e18a28 100644 --- a/java/src/Ice/CommunicatorI.java +++ b/java/src/Ice/CommunicatorI.java @@ -22,11 +22,8 @@ class CommunicatorI extends LocalObjectImpl implements Communicator if(!_destroyed) // Don't destroy twice. { _destroyed = true; - - _instance.objectAdapterFactory().shutdown(); - _instance.destroy(); - _serverThreadPool = null; + _instance.destroy(); } } @@ -45,14 +42,23 @@ class CommunicatorI extends LocalObjectImpl implements Communicator public void waitForShutdown() { - // - // No mutex locking here, otherwise the communicator is blocked - // while waiting for shutdown. - // - if(_serverThreadPool != null) + IceInternal.ObjectAdapterFactory objectAdapterFactory; + + synchronized(this) { - _serverThreadPool.waitUntilFinished(); + if(_destroyed) + { + throw new CommunicatorDestroyedException(); + } + objectAdapterFactory = _instance.objectAdapterFactory(); } + + // + // We must call waitForShutdown on the object adapter factory + // outside the synchronization, otherwise the communicator is + // blocked while we wait for shutdown. + // + objectAdapterFactory.waitForShutdown(); } public synchronized Ice.ObjectPrx @@ -281,7 +287,7 @@ class CommunicatorI extends LocalObjectImpl implements Communicator // We need _serverThreadPool directly in CommunicatorI. That's // because the shutdown() operation is signal-safe, and thus must // not access any mutex locks or _instance. It may only access - // _serverThreadPool->initiateShutdown(), which is signal-safe as + // _serverThreadPool.initiateShutdown(), which is signal-safe as // well. // private IceInternal.ThreadPool _serverThreadPool; diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index af1f959e361..adbf3faf573 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -19,18 +19,16 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public String getName() { - return _name; // _name is immutable + // + // No mutex lock necessary, _name is immutable. + // + return _name; } public synchronized Communicator getCommunicator() { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } + checkForDeactivation(); return _communicator; } @@ -38,13 +36,8 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized void activate() { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } - + checkForDeactivation(); + if(!_printAdapterReadyDone) { if(_locatorInfo != null && _id.length() > 0) @@ -101,13 +94,8 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized void hold() { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } - + checkForDeactivation(); + final int sz = _incomingConnectionFactories.size(); for(int i = 0; i < sz; ++i) { @@ -120,72 +108,119 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized void waitForHold() { - // TODO: Not implemented yet. + checkForDeactivation(); + + final int sz = _incomingConnectionFactories.size(); + for(int i = 0; i < sz; ++i) + { + IceInternal.IncomingConnectionFactory factory = + (IceInternal.IncomingConnectionFactory)_incomingConnectionFactories.get(i); + factory.waitUntilHolding(); + } } - public void + public synchronized void deactivate() { - synchronized(this) + // + // Ignore deactivation requests if the object adapter has + // already been deactivated. + // + if(_instance == null) { - if(_instance == null) - { - // - // Ignore deactivation requests if the Object Adapter has - // already been deactivated. - // - return; - } - - final int sz = _incomingConnectionFactories.size(); - for(int i = 0; i < sz; ++i) - { - IceInternal.IncomingConnectionFactory factory = - (IceInternal.IncomingConnectionFactory)_incomingConnectionFactories.get(i); - factory.destroy(); - } - _incomingConnectionFactories.clear(); - - _instance.outgoingConnectionFactory().removeAdapter(this); - - _instance = null; - _communicator = null; + return; } - - decUsageCount(); + + final int sz = _incomingConnectionFactories.size(); + for(int i = 0; i < sz; ++i) + { + IceInternal.IncomingConnectionFactory factory = + (IceInternal.IncomingConnectionFactory)_incomingConnectionFactories.get(i); + factory.destroy(); + } + + _instance.outgoingConnectionFactory().removeAdapter(this); + + _instance = null; + _communicator = null; + + notifyAll(); } public synchronized void waitForDeactivate() { - assert(_usageCount >= 0); - - while(_usageCount > 0) + // + // First we wait for deactivation of the adapter itself, and + // for the return of all direct method calls using this + // adapter. + // + while(_instance != null || _directCount > 0) { try { wait(); } - catch(java.lang.InterruptedException ex) + catch(InterruptedException ex) { } } - assert(_usageCount == 0); + // + // Now we wait for until all incoming connection factories are + // finished. + // + final int sz = _incomingConnectionFactories.size(); + for(int i = 0; i < sz; ++i) + { + IceInternal.IncomingConnectionFactory factory = + (IceInternal.IncomingConnectionFactory)_incomingConnectionFactories.get(i); + factory.waitUntilFinished(); + } + + // + // We're done, now we can throw away all incoming connection + // factories. + // + _incomingConnectionFactories.clear(); + + // + // Now it's also time to clean up the active servant map. + // + _activeServantMap.clear(); + + // + // And the servant locators, too. + // + java.util.Iterator p = _locatorMap.entrySet().iterator(); + while(p.hasNext()) + { + java.util.Map.Entry e = (java.util.Map.Entry)p.next(); + ServantLocator locator = (ServantLocator)e.getValue(); + try + { + locator.deactivate(); + } + catch(RuntimeException ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "exception during locator deactivation:\n" + "object adapter: `" + _name + "'\n" + + "locator prefix: `" + e.getKey() + "'\n" + sw.toString(); + _logger.error(s); + } + } + _locatorMap.clear(); } public synchronized ObjectPrx add(Ice.Object servant, Identity ident) { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } - - checkIdentity(ident); - + checkForDeactivation(); + checkIdentity(ident); + Ice.Object o = (Ice.Object)_activeServantMap.get(ident); if(o != null) { @@ -210,12 +245,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized ObjectPrx addWithUUID(Ice.Object servant) { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } + checkForDeactivation(); long now = System.currentTimeMillis(); Identity ident = new Identity(); @@ -230,13 +260,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized void remove(Identity ident) { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } - + checkForDeactivation(); checkIdentity(ident); Ice.Object o = (Ice.Object)_activeServantMap.get(ident); @@ -254,12 +278,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized void addServantLocator(ServantLocator locator, String prefix) { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } + checkForDeactivation(); ServantLocator l = (ServantLocator)_locatorMap.get(prefix); if(l != null) @@ -276,12 +295,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized void removeServantLocator(String prefix) { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } + checkForDeactivation(); ServantLocator l = (ServantLocator)_locatorMap.get(prefix); if(l == null) @@ -302,20 +316,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized ServantLocator findServantLocator(String prefix) { - // - // Don't check whether deactivation has been initiated. This - // operation might be called (e.g., from Incoming or Direct) - // after deactivation has been initiated, but before - // deactivation has been completed. - // - /* - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } - */ + checkForDeactivation(); return (ServantLocator)_locatorMap.get(prefix); } @@ -323,20 +324,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized Ice.Object identityToServant(Identity ident) { - // - // Don't check whether deactivation has been initiated. This - // operation might be called (e.g., from Incoming or Direct) - // after deactivation has been initiated, but before - // deactivation has been completed. - // - /* - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } - */ + checkForDeactivation(); // // Don't call checkIdentity. We simply want null to be @@ -360,13 +348,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized ObjectPrx createProxy(Identity ident) { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } - + checkForDeactivation(); checkIdentity(ident); return newProxy(ident); @@ -375,13 +357,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized ObjectPrx createDirectProxy(Identity ident) { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } - + checkForDeactivation(); checkIdentity(ident); return newDirectProxy(ident); @@ -390,13 +366,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized ObjectPrx createReverseProxy(Identity ident) { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } - + checkForDeactivation(); checkIdentity(ident); // @@ -413,12 +383,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized void addRouter(RouterPrx router) { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } + checkForDeactivation(); IceInternal.RouterInfo routerInfo = _instance.routerManager().get(router); if(routerInfo != null) @@ -463,12 +428,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized void setLocator(LocatorPrx locator) { - if(_instance == null) - { - ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException(); - e.name = _name; - throw e; - } + checkForDeactivation(); _locatorInfo = _instance.locatorManager().get(locator); } @@ -476,10 +436,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter public synchronized IceInternal.Connection[] getIncomingConnections() { - if(_instance == null) - { - throw new ObjectAdapterDeactivatedException(); - } + checkForDeactivation(); java.util.ArrayList connections = new java.util.ArrayList(); final int sz = _incomingConnectionFactories.size(); @@ -499,58 +456,28 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter } public synchronized void - incUsageCount() + incDirectCount() { - if(_instance == null) - { - throw new ObjectAdapterDeactivatedException(); - } + checkForDeactivation(); - assert(_usageCount >= 0); - ++_usageCount; + assert(_directCount >= 0); + ++_directCount; } public synchronized void - decUsageCount() + decDirectCount() { // // The object adapter may already have been deactivated when - // the usage count is decremented, thus there is no check for + // the direct count is decremented, thus there is no check for // prior deactivation. // - assert(_usageCount > 0); - --_usageCount; - - if(_usageCount == 0) + assert(_directCount > 0); + if(--_directCount == 0) { - _activeServantMap.clear(); - - java.util.Iterator p = _locatorMap.entrySet().iterator(); - while(p.hasNext()) - { - java.util.Map.Entry e = (java.util.Map.Entry)p.next(); - ServantLocator locator = (ServantLocator)e.getValue(); - try - { - locator.deactivate(); - } - catch(RuntimeException ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception during locator deactivation:\n" + "object adapter: `" + _name + "'\n" + - "locator prefix: `" + e.getKey() + "'\n" + sw.toString(); - _logger.error(s); - } - } - - _locatorMap.clear(); - notifyAll(); - } + } } // @@ -565,7 +492,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter _name = name; _id = id; _logger = instance.logger(); - _usageCount = 1; + _directCount = 0; String s = endpts.toLowerCase(); @@ -616,12 +543,18 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter finalize() throws Throwable { - assert(_usageCount == 0); - if(_instance != null) { - _instance.logger().warning("object adapter has not been deactivated"); + _instance.logger().warning("object adapter `" + _name + "' has not been deactivated"); } + else + { + assert(_communicator == null); + assert(_incomingConnectionFactories.size() == 0); + assert(_activeServantMap.size() == 0); + assert(_locatorMap.size() == 0); + assert(_directCount == 0); + } super.finalize(); } @@ -629,6 +562,8 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter private ObjectPrx newProxy(Identity ident) { + checkForDeactivation(); + if(_id.length() == 0) { return newDirectProxy(ident); @@ -651,6 +586,8 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter private ObjectPrx newDirectProxy(Identity ident) { + checkForDeactivation(); + IceInternal.Endpoint[] endpoints = new IceInternal.Endpoint[_incomingConnectionFactories.size() + _routerEndpoints.size()]; @@ -687,32 +624,11 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter return _instance.proxyFactory().referenceToProxy(reference); } - private static void - checkIdentity(Identity ident) - { - if(ident.name == null || ident.name.length() == 0) - { - IllegalIdentityException e = new IllegalIdentityException(); - try - { - e.id = (Identity)ident.clone(); - } - catch(CloneNotSupportedException ex) - { - assert(false); - } - throw e; - } - - if(ident.category == null) - { - ident.category = ""; - } - } - public boolean isLocal(ObjectPrx proxy) { + checkForDeactivation(); + IceInternal.Reference ref = ((ObjectPrxHelper)proxy).__reference(); final IceInternal.Endpoint[] endpoints = ref.endpoints; @@ -771,6 +687,40 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter return false; } + private void + checkForDeactivation() + { + if(_instance == null) + { + ObjectAdapterDeactivatedException ex = new ObjectAdapterDeactivatedException(); + ex.name = _name; + throw ex; + } + } + + private static void + checkIdentity(Identity ident) + { + if(ident.name == null || ident.name.length() == 0) + { + IllegalIdentityException e = new IllegalIdentityException(); + try + { + e.id = (Identity)ident.clone(); + } + catch(CloneNotSupportedException ex) + { + assert(false); + } + throw e; + } + + if(ident.category == null) + { + ident.category = ""; + } + } + private IceInternal.Instance _instance; private Communicator _communicator; private boolean _printAdapterReadyDone; @@ -782,5 +732,5 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter private java.util.ArrayList _incomingConnectionFactories = new java.util.ArrayList(); private java.util.ArrayList _routerEndpoints = new java.util.ArrayList(); private IceInternal.LocatorInfo _locatorInfo; - private int _usageCount; + private int _directCount; } diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java index 12aabfcd57a..37c7dc17119 100644 --- a/java/src/Ice/_ObjectDelM.java +++ b/java/src/Ice/_ObjectDelM.java @@ -164,7 +164,7 @@ public class _ObjectDelM implements _ObjectDel __reference = from.__reference; __connection = from.__connection; - __connection.incUsageCount(); + __connection.incProxyCount(); } protected IceInternal.Connection __connection; @@ -212,7 +212,7 @@ public class _ObjectDelM implements _ObjectDel } assert(j < connections.length); __connection = connections[j]; - __connection.incUsageCount(); + __connection.incProxyCount(); } else { @@ -257,7 +257,7 @@ public class _ObjectDelM implements _ObjectDel IceInternal.OutgoingConnectionFactory factory = __reference.instance.outgoingConnectionFactory(); __connection = factory.create(filteredEndpoints); assert(__connection != null); - __connection.incUsageCount(); + __connection.incProxyCount(); } catch (LocalException ex) { @@ -453,7 +453,7 @@ public class _ObjectDelM implements _ObjectDel { if(__connection != null) { - __connection.decUsageCount(); + __connection.decProxyCount(); } while(__outgoingCache != null) diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java index 6b89ecc732b..3dc23113118 100644 --- a/java/src/IceInternal/Connection.java +++ b/java/src/IceInternal/Connection.java @@ -16,160 +16,173 @@ package IceInternal; public final class Connection extends EventHandler { - public boolean - destroyed() + public synchronized void + activate() { - _mutex.lock(); - try - { - return _state >= StateClosing; - } - finally - { - _mutex.unlock(); - } + setState(StateActive); } - public void - validate() + public synchronized void + hold() { - _mutex.lock(); - try - { - if(_endpoint.datagram()) + setState(StateHolding); + } + + // DestructionReason + public final static int ObjectAdapterDeactivated = 0; + public final static int CommunicatorDestroyed = 1; + + public synchronized void + destroy(int reason) + { + _batchStream.destroy(); + + switch(reason) + { + case ObjectAdapterDeactivated: { - // - // Datagram connections are always implicitly validated. - // - return; + setState(StateClosing, new Ice.ObjectAdapterDeactivatedException()); + break; } - - try + + case CommunicatorDestroyed: { - if(_adapter != null) - { - // - // Incoming connections play the active role with - // respect to connection validation. - // - BasicStream os = new BasicStream(_instance); - os.writeByte(Protocol.protocolVersion); - os.writeByte(Protocol.encodingVersion); - os.writeByte(Protocol.validateConnectionMsg); - os.writeInt(Protocol.headerSize); // Message size. - TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - } - else - { - // - // Outgoing connection play the passive role with - // respect to connection validation. - // - BasicStream is = new BasicStream(_instance); - is.resize(Protocol.headerSize, true); - is.pos(0); - _transceiver.read(is, _endpoint.timeout()); - int pos = is.pos(); - assert(pos >= Protocol.headerSize); - is.pos(0); - byte protVer = is.readByte(); - if(protVer != Protocol.protocolVersion) - { - throw new Ice.UnsupportedProtocolException(); - } - byte encVer = is.readByte(); - if(encVer != Protocol.encodingVersion) - { - throw new Ice.UnsupportedEncodingException(); - } - byte messageType = is.readByte(); - if(messageType != Protocol.validateConnectionMsg) - { - throw new Ice.ConnectionNotValidatedException(); - } - int size = is.readInt(); - if(size != Protocol.headerSize) - { - throw new Ice.IllegalMessageSizeException(); - } - TraceUtil.traceHeader("received validate connection", is, _logger, _traceLevels); - } + setState(StateClosing, new Ice.CommunicatorDestroyedException()); + break; } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - assert(_exception != null); - throw _exception; - } } - finally - { - _mutex.unlock(); - } } - public void - hold() + public synchronized boolean + isDestroyed() { - _mutex.lock(); - try - { - setState(StateHolding); - } - finally - { - _mutex.unlock(); - } + return _state >= StateClosing; } - public void - activate() + public synchronized boolean + isFinished() { - _mutex.lock(); - try - { - setState(StateActive); - } - finally - { - _mutex.unlock(); + return _transceiver == null; + } + + public synchronized void + waitUntilHolding() + { + while(_state < StateHolding || _dispatchCount > 0) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } } } - public void - incUsageCount() + public synchronized void + waitUntilFinished() { - _mutex.lock(); - try - { - assert(_usageCount >= 0); - ++_usageCount; + while(_transceiver != null) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } } - finally - { - _mutex.unlock(); - } } - public void - decUsageCount() + public synchronized void + validate() { - _mutex.lock(); - try - { - assert(_usageCount > 0); - --_usageCount; - if(_usageCount == 0 && _adapter == null) + if(_endpoint.datagram()) + { + // + // Datagram connections are always implicitly validated. + // + return; + } + + try + { + if(_adapter != null) + { + // + // Incoming connections play the active role with + // respect to connection validation. + // + BasicStream os = new BasicStream(_instance); + os.writeByte(Protocol.protocolVersion); + os.writeByte(Protocol.encodingVersion); + os.writeByte(Protocol.validateConnectionMsg); + os.writeInt(Protocol.headerSize); // Message size. + TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + } + else { - assert(_requests.isEmpty()); - setState(StateClosing, new Ice.CloseConnectionException()); + // + // Outgoing connection play the passive role with + // respect to connection validation. + // + BasicStream is = new BasicStream(_instance); + is.resize(Protocol.headerSize, true); + is.pos(0); + _transceiver.read(is, _endpoint.timeout()); + int pos = is.pos(); + assert(pos >= Protocol.headerSize); + is.pos(0); + byte protVer = is.readByte(); + if(protVer != Protocol.protocolVersion) + { + throw new Ice.UnsupportedProtocolException(); + } + byte encVer = is.readByte(); + if(encVer != Protocol.encodingVersion) + { + throw new Ice.UnsupportedEncodingException(); + } + byte messageType = is.readByte(); + if(messageType != Protocol.validateConnectionMsg) + { + throw new Ice.ConnectionNotValidatedException(); + } + int size = is.readInt(); + if(size != Protocol.headerSize) + { + throw new Ice.IllegalMessageSizeException(); + } + TraceUtil.traceHeader("received validate connection", is, _logger, _traceLevels); } } - finally - { - _mutex.unlock(); - } + catch(Ice.LocalException ex) + { + setState(StateClosed, ex); + assert(_exception != null); + throw _exception; + } + } + + public synchronized void + incProxyCount() + { + assert(_proxyCount >= 0); + ++_proxyCount; + } + + public synchronized void + decProxyCount() + { + assert(_proxyCount > 0); + --_proxyCount; + if(_proxyCount == 0 && _adapter == null) + { + assert(_requests.isEmpty()); + setState(StateClosing, new Ice.CloseConnectionException()); + } } private final static byte[] _requestHdr = @@ -187,62 +200,58 @@ public final class Connection extends EventHandler os.writeBlob(_requestHdr); } - public void + public synchronized void sendRequest(Outgoing out, boolean oneway) { - _mutex.lock(); - try - { - if(_exception != null) - { - throw _exception; - } - assert(_state < StateClosing); - - int requestId = 0; - - try - { - BasicStream os = out.os(); - os.pos(3); - - // - // Fill in the message size and request ID. - // - os.writeInt(os.size()); - if(!_endpoint.datagram() && !oneway) - { - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - os.writeInt(requestId); - } - TraceUtil.traceRequest("sending request", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - assert(_exception != null); - throw _exception; - } - - // - // Only add to the request map if there was no exception, and if - // the operation is not oneway. - // - if(!_endpoint.datagram() && !oneway) - { - _requests.put(requestId, out); - } - } - finally - { - _mutex.unlock(); - } + if(_exception != null) + { + throw _exception; + } + assert(_state < StateClosing); + + int requestId = 0; + + try + { + BasicStream os = out.os(); + os.pos(3); + + // + // Fill in the message size and request ID. + // + os.writeInt(os.size()); + if(!_endpoint.datagram() && !oneway) + { + requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + os.writeInt(requestId); + } + + // + // Send the request. + // + TraceUtil.traceRequest("sending request", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + } + catch(Ice.LocalException ex) + { + setState(StateClosed, ex); + assert(_exception != null); + throw _exception; + } + + // + // Only add to the request map if there was no exception, and if + // the operation is not oneway. + // + if(!_endpoint.datagram() && !oneway) + { + _requests.put(requestId, out); + } } private final static byte[] _requestBatchHdr = @@ -253,14 +262,22 @@ public final class Connection extends EventHandler (byte)0, (byte)0, (byte)0, (byte)0 // Message size (placeholder). }; - public void + public synchronized void prepareBatchRequest(BasicStream os) { - _mutex.lock(); + while(_batchStreamInUse && _exception == null) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } if(_exception != null) { - _mutex.unlock(); throw _exception; } assert(_state < StateClosing); @@ -277,77 +294,165 @@ public final class Connection extends EventHandler // // Give the batch stream to caller, until finishBatchRequest() - // is called. + // or abortBatchRequest() is called. // - _batchStream.swap(os); + _batchStreamInUse = true; } - public void + public synchronized void finishBatchRequest(BasicStream os) { + assert(_batchStreamInUse); + if(_exception != null) { - _mutex.unlock(); throw _exception; } assert(_state < StateClosing); _batchStream.swap(os); // Get the batch stream back. - _mutex.unlock(); // Give the Connection back. + ++_batchRequestNum; // Increment the number of requests in the batch. + + // + // Give the Connection back. + // + _batchStreamInUse = false; + notifyAll(); } - public void + public synchronized void abortBatchRequest() { + assert(_batchStreamInUse); + setState(StateClosed, new Ice.AbortBatchRequestException()); - _mutex.unlock(); // Give the Connection back. + + // + // Give the Connection back. + // + _batchStreamInUse = false; + notifyAll(); } - public void + public synchronized void flushBatchRequest() { - _mutex.lock(); - try - { - if(_exception != null) - { - throw _exception; - } - assert(_state < StateClosing); - - try - { - if(_batchStream.size() == 0) - { - return; // Nothing to send. - } + while(_batchStreamInUse && _exception == null) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } - _batchStream.pos(3); + if(_exception != null) + { + throw _exception; + } + assert(_state < StateClosing); + + try + { + if(_batchStream.size() == 0) + { + return; // Nothing to send. + } + + _batchStream.pos(3); + + // + // Fill in the message size the number of requests in the batch. + // + _batchStream.writeInt(_batchStream.size()); + _batchStream.writeInt(_batchRequestNum); + + // + // Send the batch request. + // + TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + _transceiver.write(_batchStream, _endpoint.timeout()); + + // + // Reset _batchStream so that new batch messages can be sent. + // + _batchStream.destroy(); + _batchStream = new BasicStream(_instance); + _batchRequestNum = 0; + } + catch(Ice.LocalException ex) + { + setState(StateClosed, ex); + assert(_exception != null); + throw _exception; + } + } - // - // Fill in the message size. - // - _batchStream.writeInt(_batchStream.size()); - TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - _transceiver.write(_batchStream, _endpoint.timeout()); + public synchronized void + sendResponse(BasicStream os) + { + try + { + if(_state == StateClosed) + { + return; + } + + // + // Fill in the message size. + // + os.pos(3); + final int sz = os.size(); + os.writeInt(sz); + + // + // Send the reply. + // + TraceUtil.traceReply("sending reply", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + + if(--_dispatchCount == 0) + { + notifyAll(); + } + + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); + } + } + catch(Ice.LocalException ex) + { + setState(StateClosed, ex); + } + } - // - // Reset _batchStream so that new batch messages can be sent. - // - _batchStream.destroy(); - _batchStream = new BasicStream(_instance); - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - assert(_exception != null); - throw _exception; - } - } - finally - { - _mutex.unlock(); - } + public synchronized void + sendNoResponse() + { + try + { + if(_state == StateClosed) + { + return; + } + + if(--_dispatchCount == 0) + { + notifyAll(); + } + + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); + } + } + catch(Ice.LocalException ex) + { + setState(StateClosed, ex); + } } public int @@ -364,56 +469,40 @@ public final class Connection extends EventHandler return _endpoint; } - public void + public synchronized void setAdapter(Ice.ObjectAdapter adapter) { - _mutex.lock(); - try - { - // - // We are registered with a thread pool in active and closing - // mode. However, we only change subscription if we're in active - // mode, and thus ignore closing mode here. - // - if(_state == StateActive) - { - if(adapter != null && _adapter == null) - { - // - // Client is now server. - // - unregisterWithPool(); - } - - if(adapter == null && _adapter != null) - { - // - // Server is now client. - // - unregisterWithPool(); - } - } - - _adapter = adapter; - } - finally - { - _mutex.unlock(); - } + // + // We are registered with a thread pool in active and closing + // mode. However, we only change subscription if we're in active + // mode, and thus ignore closing mode here. + // + if(_state == StateActive) + { + if(adapter != null && _adapter == null) + { + // + // Client is now server. + // + unregisterWithPool(); + } + + if(adapter == null && _adapter != null) + { + // + // Server is now client. + // + unregisterWithPool(); + } + } + + _adapter = adapter; } - public Ice.ObjectAdapter + public synchronized Ice.ObjectAdapter getAdapter() { - _mutex.lock(); - try - { - return _adapter; - } - finally - { - _mutex.unlock(); - } + return _adapter; } // @@ -442,11 +531,10 @@ public final class Connection extends EventHandler public void message(BasicStream stream, ThreadPool threadPool) { - Incoming in = null; - boolean batch = false; + int invoke = 0; + int requestId = 0; - _mutex.lock(); - try + synchronized(this) { threadPool.promoteFollower(); @@ -483,7 +571,9 @@ public final class Connection extends EventHandler else { TraceUtil.traceRequest("received request", stream, _logger, _traceLevels); - in = getIncoming(); + requestId = stream.readInt(); + invoke = 1; + ++_dispatchCount; } break; } @@ -499,8 +589,12 @@ public final class Connection extends EventHandler else { TraceUtil.traceBatchRequest("received batch request", stream, _logger, _traceLevels); - in = getIncoming(); - batch = true; + invoke = stream.readInt(); + if(invoke < 0) + { + throw new Ice.NegativeSizeException(); + } + _dispatchCount += invoke; } break; } @@ -508,7 +602,7 @@ public final class Connection extends EventHandler case Protocol.replyMsg: { TraceUtil.traceReply("received reply", stream, _logger, _traceLevels); - int requestId = stream.readInt(); + requestId = stream.readInt(); Outgoing out = (Outgoing)_requests.remove(requestId); if(out == null) { @@ -562,111 +656,58 @@ public final class Connection extends EventHandler return; } } - finally - { - _mutex.unlock(); - } // // Method invocation must be done outside the thread // synchronization, so that nested calls are possible. // - if(in != null) + if(invoke > 0) { + Incoming in = null; + try { // // Prepare the invocation. // + boolean response = !_endpoint.datagram() && requestId != 0; + in = getIncoming(response); BasicStream is = in.is(); stream.swap(is); - BasicStream os = null; + BasicStream os = in.os(); try { // // Prepare the response if necessary. // - if(!batch) + if(response) { - int requestId = is.readInt(); - if(!_endpoint.datagram() && requestId != 0) // 0 means oneway. - { - ++_responseCount; - os = in.os(); - os.writeBlob(_replyHdr); - os.writeInt(requestId); - } + assert(invoke == 1); + os.writeBlob(_replyHdr); + + // + // Fill in the request ID. + // + os.writeInt(requestId); } // - // Do the invocation, or multiple invocations for - // batch messages. + // Do the invocation, or multiple invocations for batch + // messages. // - do + while(invoke-- > 0) { - in.invoke(os != null); + in.invoke(); } - while(batch && is.pos() < is.size()); } catch(Ice.LocalException ex) { - reclaimIncoming(in); // Must be called outside of synchronization on _mutex. + reclaimIncoming(in); // Must be called outside the synchronization. in = null; - _mutex.lock(); - try + synchronized(this) { setState(StateClosed, ex); - return; - } - finally - { - _mutex.unlock(); - } - } - - // - // Send a response if necessary. - // - if(os != null) - { - _mutex.lock(); - - try - { - try - { - if(_state == StateClosed) - { - return; - } - - // - // Fill in the message size. - // - os.pos(3); - final int sz = os.size(); - os.writeInt(sz); - - TraceUtil.traceReply("sending reply", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - - --_responseCount; - - if(_state == StateClosing && _responseCount == 0) - { - initiateShutdown(); - } - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - return; - } - } - finally - { - _mutex.unlock(); } } } @@ -674,52 +715,39 @@ public final class Connection extends EventHandler { if(in != null) { - reclaimIncoming(in); // Must be called outside of synchronization on _mutex. + reclaimIncoming(in); // Must be called outside the synchronization. } } } } - public void + public synchronized void finished(ThreadPool threadPool) { - _mutex.lock(); - try - { - threadPool.promoteFollower(); - - if(_state == StateActive || _state == StateClosing) - { - registerWithPool(); - } - else if(_state == StateClosed) - { - _transceiver.close(); - } - } - finally - { - _mutex.unlock(); - } + threadPool.promoteFollower(); + + if(_state == StateActive || _state == StateClosing) + { + registerWithPool(); + } + else if(_state == StateClosed) + { + _transceiver.close(); + _transceiver = null; + notifyAll(); + } } - public void + public synchronized void exception(Ice.LocalException ex) { - _mutex.lock(); - try - { - setState(StateClosed, ex); - } - finally - { - _mutex.unlock(); - } + setState(StateClosed, ex); } - public String + public synchronized String toString() { + assert(_transceiver != null); return _transceiver.toString(); } @@ -731,21 +759,25 @@ public final class Connection extends EventHandler _adapter = adapter; _logger = instance.logger(); _traceLevels = instance.traceLevels(); + _registeredWithPool = false; _warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; _nextRequestId = 1; _batchStream = new BasicStream(instance); - _responseCount = 0; - _usageCount = 0; + _batchStreamInUse = false; + _batchRequestNum = 0; + _dispatchCount = 0; + _proxyCount = 0; _state = StateHolding; - _registeredWithPool = false; } protected void finalize() throws Throwable { - assert(_usageCount == 0); assert(_state == StateClosed); + assert(_transceiver == null); + assert(_dispatchCount == 0); + assert(_proxyCount == 0); assert(_incomingCache == null); // @@ -757,39 +789,6 @@ public final class Connection extends EventHandler super.finalize(); } - // DestructionReason - public final static int ObjectAdapterDeactivated = 0; - public final static int CommunicatorDestroyed = 1; - - public void - destroy(int reason) - { - _mutex.lock(); - try - { - _batchStream.destroy(); - - switch(reason) - { - case ObjectAdapterDeactivated: - { - setState(StateClosing, new Ice.ObjectAdapterDeactivatedException()); - break; - } - - case CommunicatorDestroyed: - { - setState(StateClosing, new Ice.CommunicatorDestroyedException()); - break; - } - } - } - finally - { - _mutex.unlock(); - } - } - private static final int StateActive = 0; private static final int StateHolding = 1; private static final int StateClosing = 2; @@ -902,13 +901,15 @@ public final class Connection extends EventHandler } unregisterWithPool(); destroyIncomingCache(); + _dispatchCount = 0; break; } } _state = state; + notifyAll(); - if(_state == StateClosing && _responseCount == 0) + if(_state == StateClosing && _dispatchCount == 0) { try { @@ -925,7 +926,7 @@ public final class Connection extends EventHandler initiateShutdown() { assert(_state == StateClosing); - assert(_responseCount == 0); + assert(_dispatchCount == 0); if(!_endpoint.datagram()) { @@ -1013,7 +1014,7 @@ public final class Connection extends EventHandler } private Incoming - getIncoming() + getIncoming(boolean response) { Incoming in = null; @@ -1021,14 +1022,14 @@ public final class Connection extends EventHandler { if(_incomingCache == null) { - in = new Incoming(_instance, _adapter); + in = new Incoming(_instance, _adapter, this, response); } else { in = _incomingCache; _incomingCache = _incomingCache.next; in.next = null; - in.reset(_adapter); + in.reset(_adapter, this, response); } } @@ -1038,8 +1039,6 @@ public final class Connection extends EventHandler private void reclaimIncoming(Incoming in) { - in.finished(); - synchronized(_incomingCacheMutex) { in.next = _incomingCache; @@ -1065,23 +1064,34 @@ public final class Connection extends EventHandler } } - private final Transceiver _transceiver; + private Transceiver _transceiver; private final Endpoint _endpoint; + private Ice.ObjectAdapter _adapter; + private final Ice.Logger _logger; private final TraceLevels _traceLevels; + private ThreadPool _clientThreadPool; private ThreadPool _serverThreadPool; + private boolean _registeredWithPool; + private final boolean _warn; + private int _nextRequestId; private IntMap _requests = new IntMap(); + private Ice.LocalException _exception; + private BasicStream _batchStream; - private int _responseCount; - private int _usageCount; + private boolean _batchStreamInUse; + private int _batchRequestNum; + + private int _dispatchCount; // The number of requests currently being dispatched. + private int _proxyCount; // The number of proxies using this connection. + private int _state; - private boolean _registeredWithPool; - private RecursiveMutex _mutex = new RecursiveMutex(); + private Incoming _incomingCache; private java.lang.Object _incomingCacheMutex = new java.lang.Object(); } diff --git a/java/src/IceInternal/Direct.java b/java/src/IceInternal/Direct.java index 031bf58bc94..05efb96f406 100644 --- a/java/src/IceInternal/Direct.java +++ b/java/src/IceInternal/Direct.java @@ -21,10 +21,10 @@ public final class Direct { _current = current; + ((Ice.ObjectAdapterI)(_current.adapter)).incDirectCount(); + try { - ((Ice.ObjectAdapterI)(_current.adapter)).incUsageCount(); - _servant = _current.adapter.identityToServant(_current.id); if(_servant == null && _current.id.category.length() > 0) @@ -32,7 +32,7 @@ public final class Direct _locator = _current.adapter.findServantLocator(_current.id.category); if(_locator != null) { - _cookie = new Ice.LocalObjectHolder(); // Lazy creation + _cookie = new Ice.LocalObjectHolder(); // Lazy creation. _servant = _locator.locate(_current, _cookie); } } @@ -42,7 +42,7 @@ public final class Direct _locator = _current.adapter.findServantLocator(""); if(_locator != null) { - _cookie = new Ice.LocalObjectHolder(); // Lazy creation + _cookie = new Ice.LocalObjectHolder(); // Lazy creation. _servant = _locator.locate(_current, _cookie); } } @@ -81,7 +81,7 @@ public final class Direct } finally { - ((Ice.ObjectAdapterI)(_current.adapter)).decUsageCount(); + ((Ice.ObjectAdapterI)(_current.adapter)).decDirectCount(); } } } @@ -98,7 +98,7 @@ public final class Direct } finally { - ((Ice.ObjectAdapterI)(_current.adapter)).decUsageCount(); + ((Ice.ObjectAdapterI)(_current.adapter)).decDirectCount(); } } @@ -115,7 +115,7 @@ public final class Direct } } - private Ice.Current _current; + private final Ice.Current _current; private Ice.Object _servant; private Ice.Object _facetServant; private Ice.ServantLocator _locator; diff --git a/java/src/IceInternal/Incoming.java b/java/src/IceInternal/Incoming.java index 4694b464b44..c7857ba4bbc 100644 --- a/java/src/IceInternal/Incoming.java +++ b/java/src/IceInternal/Incoming.java @@ -17,33 +17,16 @@ package IceInternal; public class Incoming { public - Incoming(Instance instance, Ice.ObjectAdapter adapter) + Incoming(Instance instance, Ice.ObjectAdapter adapter, Connection connection, boolean response) { - _is = new BasicStream(instance); - _os = new BasicStream(instance); _current = new Ice.Current(); - _current.adapter = adapter; _current.id = new Ice.Identity(); + _current.adapter = adapter; _cookie = new Ice.LocalObjectHolder(); - - if(_current.adapter != null) - { - ((Ice.ObjectAdapterI)(_current.adapter)).incUsageCount(); - } - } - - // - // Must be called immediately after this object is no longer - // needed, in order to update the object adapter usage count. - // - public void - finished() - { - if(_current.adapter != null) - { - ((Ice.ObjectAdapterI)(_current.adapter)).decUsageCount(); - _current.adapter = null; - } + _connection = connection; + _response = response; + _is = new BasicStream(instance); + _os = new BasicStream(instance); } // @@ -51,23 +34,20 @@ public class Incoming // reallocated. // public void - reset(Ice.ObjectAdapter adapter) + reset(Ice.ObjectAdapter adapter, Connection connection, boolean response) { - _is.reset(); - _os.reset(); + _current.adapter = adapter; if(_current.ctx != null) { _current.ctx.clear(); } - - //assert(_current.adapter == null); // finished() should have been called - - _current.adapter = adapter; - - if(_current.adapter != null) - { - ((Ice.ObjectAdapterI)(_current.adapter)).incUsageCount(); - } + _servant = null; + _locator = null; + _cookie.value = null; + _connection = connection; + _response = response; + _is.reset(); + _os.reset(); } // @@ -81,7 +61,7 @@ public class Incoming } public void - invoke(boolean response) + invoke() { // // Read the current. @@ -104,16 +84,13 @@ public class Incoming _is.startReadEncaps(); - if(response) + if(_response) { assert(_os.size() == Protocol.headerSize + 4); // Dispatch status position. _os.writeByte((byte)0); _os.startWriteEncaps(); } - Ice.Object servant = null; - Ice.ServantLocator locator = null; - _cookie.value = null; DispatchStatus status; // @@ -126,28 +103,28 @@ public class Incoming { if(_current.adapter != null) { - servant = _current.adapter.identityToServant(_current.id); + _servant = _current.adapter.identityToServant(_current.id); - if(servant == null && _current.id.category.length() > 0) + if(_servant == null && _current.id.category.length() > 0) { - locator = _current.adapter.findServantLocator(_current.id.category); - if(locator != null) + _locator = _current.adapter.findServantLocator(_current.id.category); + if(_locator != null) { - servant = locator.locate(_current, _cookie); + _servant = _locator.locate(_current, _cookie); } } - if(servant == null) + if(_servant == null) { - locator = _current.adapter.findServantLocator(""); - if(locator != null) + _locator = _current.adapter.findServantLocator(""); + if(_locator != null) { - servant = locator.locate(_current, _cookie); + _servant = _locator.locate(_current, _cookie); } } } - if(servant == null) + if(_servant == null) { status = DispatchStatus.DispatchObjectNotExist; } @@ -155,7 +132,7 @@ public class Incoming { if(_current.facet.length > 0) { - Ice.Object facetServant = servant.ice_findFacetPath(_current.facet, 0); + Ice.Object facetServant = _servant.ice_findFacetPath(_current.facet, 0); if(facetServant == null) { status = DispatchStatus.DispatchFacetNotExist; @@ -167,19 +144,12 @@ public class Incoming } else { - status = servant.__dispatch(this, _current); + status = _servant.__dispatch(this, _current); } } } catch(Ice.RequestFailedException ex) { - if(locator != null && servant != null) - { - locator.finished(_current, servant, _cookie.value); - } - - _is.endReadEncaps(); - if(ex.id == null) { ex.id = _current.id; @@ -195,7 +165,9 @@ public class Incoming ex.operation = _current.operation; } - if(response) + warning(ex); + + if(_response) { _os.endWriteEncaps(); _os.resize(Protocol.headerSize + 4, false); // Dispatch status position. @@ -220,19 +192,14 @@ public class Incoming _os.writeString(ex.operation); } - warning(ex); + finishInvoke(); return; } catch(Ice.LocalException ex) { - if(locator != null && servant != null) - { - locator.finished(_current, servant, _cookie.value); - } - - _is.endReadEncaps(); + warning(ex); - if(response) + if(_response) { _os.endWriteEncaps(); _os.resize(Protocol.headerSize + 4, false); // Dispatch status position. @@ -240,7 +207,7 @@ public class Incoming _os.writeString(ex.toString()); } - warning(ex); + finishInvoke(); return; } /* Not possible in Java - UserExceptions are checked exceptions @@ -251,14 +218,9 @@ public class Incoming */ catch(RuntimeException ex) { - if(locator != null && servant != null) - { - locator.finished(_current, servant, _cookie.value); - } - - _is.endReadEncaps(); + warning(ex); - if(response) + if(_response) { _os.endWriteEncaps(); _os.resize(Protocol.headerSize + 4, false); // Dispatch status position. @@ -266,18 +228,17 @@ public class Incoming _os.writeString(ex.toString()); } - warning(ex); + finishInvoke(); return; } - if(locator != null && servant != null) - { - locator.finished(_current, servant, _cookie.value); - } - - _is.endReadEncaps(); + // + // Don't put the code below into the try block above. Exceptions + // in the code below are considered fatal, and must propagate to + // the caller of this operation. + // - if(response) + if(_response) { _os.endWriteEncaps(); @@ -302,6 +263,8 @@ public class Incoming _os.pos(save); } } + + finishInvoke(); } public BasicStream @@ -317,6 +280,31 @@ public class Incoming } private void + finishInvoke() + { + if(_locator != null && _servant != null) + { + _locator.finished(_current, _servant, _cookie.value); + } + + _is.endReadEncaps(); + + // + // Send a response if necessary. If we don't need to send a + // response, we still need to tell the connection that we're + // finished with dispatching. + // + if(_response) + { + _connection.sendResponse(_os); + } + else + { + _connection.sendNoResponse(); + } + } + + private void warning(Exception ex) { if(_os.instance().properties().getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 0) @@ -356,10 +344,16 @@ public class Incoming } } - private BasicStream _is; - private BasicStream _os; private Ice.Current _current; + private Ice.Object _servant; + private Ice.ServantLocator _locator; private Ice.LocalObjectHolder _cookie; - Incoming next; // For use by Connection + private Connection _connection; + private boolean _response; + + private BasicStream _is; + private BasicStream _os; + + Incoming next; // For use by Connection. } diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 5bb7816ef96..ce96f78d123 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -17,15 +17,84 @@ package IceInternal; public class IncomingConnectionFactory extends EventHandler { public synchronized void + activate() + { + setState(StateActive); + } + + public synchronized void hold() { setState(StateHolding); } public synchronized void - activate() + destroy() { - setState(StateActive); + setState(StateClosed); + } + + public synchronized void + waitUntilHolding() + { + // + // First we wait until the connection factory itself is in + // holding state. + // + while(_state < StateHolding) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + // + // Now we wait until each connection is in holding state. + // + java.util.ListIterator iter = _connections.listIterator(); + while(iter.hasNext()) + { + Connection connection = (Connection)iter.next(); + connection.waitUntilHolding(); + } + } + + public synchronized void + waitUntilFinished() + { + // + // First we wait until the factory is destroyed. + // + while(_acceptor != null) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + // + // Now we wait for until the destruction of each connection is + // finished. + // + java.util.ListIterator iter = _connections.listIterator(); + while(iter.hasNext()) + { + Connection connection = (Connection)iter.next(); + connection.waitUntilFinished(); + } + + // + // We're done, now we can throw away all connections. + // + _connections.clear(); } public Endpoint @@ -50,21 +119,23 @@ public class IncomingConnectionFactory extends EventHandler public synchronized Connection[] connections() { - // - // Reap destroyed connections. - // + java.util.LinkedList connections = new java.util.LinkedList(); + + // + // Only copy connections which have not been destroyed. + // java.util.ListIterator iter = _connections.listIterator(); while(iter.hasNext()) { Connection connection = (Connection)iter.next(); - if(connection.destroyed()) + if(!connection.isDestroyed()) { - iter.remove(); + connections.add(connection); } } - Connection[] arr = new Connection[_connections.size()]; - _connections.toArray(arr); + Connection[] arr = new Connection[connections.size()]; + connections.toArray(arr); return arr; } @@ -93,14 +164,14 @@ public class IncomingConnectionFactory extends EventHandler return; } - // - // Reap destroyed connections. - // + // + // Reap connections for which destruction has completed. + // java.util.ListIterator iter = _connections.listIterator(); while(iter.hasNext()) { Connection connection = (Connection)iter.next(); - if(connection.destroyed()) + if(connection.isFinished()) { iter.remove(); } @@ -169,20 +240,16 @@ public class IncomingConnectionFactory extends EventHandler { threadPool.promoteFollower(); - if(_state == StateActive) - { - registerWithPool(); - } - else if(_state == StateClosed) - { - _acceptor.close(); - - // - // We don't need the adapter anymore after we closed the - // acceptor. - // - _adapter = null; - } + if(_state == StateActive) + { + registerWithPool(); + } + else if(_state == StateClosed) + { + _acceptor.close(); + _acceptor = null; + notifyAll(); + } } public void @@ -191,7 +258,8 @@ public class IncomingConnectionFactory extends EventHandler assert(false); // Must not be called. } - public String toString() + public synchronized String + toString() { if(_transceiver != null) { @@ -208,9 +276,9 @@ public class IncomingConnectionFactory extends EventHandler super(instance); _endpoint = endpoint; _adapter = adapter; + _registeredWithPool = false; _warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; _state = StateHolding; - _registeredWithPool = false; DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); if(defaultsAndOverrides.overrideTimeout) @@ -229,12 +297,6 @@ public class IncomingConnectionFactory extends EventHandler Connection connection = new Connection(_instance, _transceiver, _endpoint, _adapter); connection.validate(); _connections.add(connection); - - // - // We don't need an adapter anymore if we don't use an - // acceptor. - // - _adapter = null; } else { @@ -256,8 +318,9 @@ public class IncomingConnectionFactory extends EventHandler finalize() throws Throwable { - assert(_state == StateClosed); - assert(_adapter == null); + assert(_state == StateClosed); + assert(_acceptor == null); + assert(_connections.size() == 0); // // Destroy the EventHandler's stream, so that its buffer @@ -268,12 +331,6 @@ public class IncomingConnectionFactory extends EventHandler super.finalize(); } - public synchronized void - destroy() - { - setState(StateClosed); - } - private static final int StateActive = 0; private static final int StateHolding = 1; private static final int StateClosed = 2; @@ -307,7 +364,7 @@ public class IncomingConnectionFactory extends EventHandler case StateHolding: { - if(_state != StateActive) // Can only switch from active to holding + if(_state != StateActive) // Can only switch from active to holding. { return; } @@ -340,13 +397,12 @@ public class IncomingConnectionFactory extends EventHandler Connection connection = (Connection)iter.next(); connection.destroy(Connection.ObjectAdapterDeactivated); } - _connections.clear(); - - break; + break; } } _state = state; + notifyAll(); } private void @@ -392,13 +448,18 @@ public class IncomingConnectionFactory extends EventHandler _instance.logger().warning(s); } - private Endpoint _endpoint; - private Ice.ObjectAdapter _adapter; // Cannot be final, because it must be set to null to break cyclic dependency. private Acceptor _acceptor; private final Transceiver _transceiver; + private Endpoint _endpoint; + + private final Ice.ObjectAdapter _adapter; + private ThreadPool _serverThreadPool; + private boolean _registeredWithPool; + private final boolean _warn; + private java.util.LinkedList _connections = new java.util.LinkedList(); + private int _state; - private boolean _registeredWithPool; } diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 10c0b4bd3e2..3303a6da684 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -265,113 +265,59 @@ public class Instance public void destroy() { - ThreadPool clientThreadPool; - ThreadPool serverThreadPool; - + assert(!_destroyed); + + _objectAdapterFactory.shutdown(); + _objectAdapterFactory.waitForShutdown(); + + _outgoingConnectionFactory.destroy(); + _outgoingConnectionFactory.waitUntilFinished(); + synchronized(this) { - if(_destroyed) - { - return; // Don't destroy twice. - } + _objectAdapterFactory = null; + _outgoingConnectionFactory = null; - _destroyed = true; - - if(_objectAdapterFactory != null) + if(_serverThreadPool != null) { - // Don't shut down the object adapters -- the communicator - // must do this before it destroys this object. - _objectAdapterFactory = null; + _serverThreadPool.destroy(); + _serverThreadPool.joinWithAllThreads(); + _serverThreadPool = null; } - if(_outgoingConnectionFactory != null) + if(_clientThreadPool != null) { - _outgoingConnectionFactory.destroy(); - _outgoingConnectionFactory = null; - } - - // - // We destroy the thread pool outside the thread - // synchronization. - // - clientThreadPool = _clientThreadPool; - _clientThreadPool = null; - serverThreadPool = _serverThreadPool; - _serverThreadPool = null; - } - - // - // We must destroy the outgoing connection factory before we - // destroy the client thread pool. - // - if(clientThreadPool != null) - { - clientThreadPool.waitUntilFinished(); - clientThreadPool.destroy(); - clientThreadPool.joinWithAllThreads(); - } - - // - // We must destroy the object adapter factory before we destroy - // the server thread pool. - // - if(serverThreadPool != null) - { - serverThreadPool.waitUntilFinished(); - serverThreadPool.destroy(); - serverThreadPool.joinWithAllThreads(); - } - - synchronized(this) - { - if(_servantFactoryManager != null) - { - _servantFactoryManager.destroy(); - _servantFactoryManager = null; + _clientThreadPool.destroy(); + _clientThreadPool.joinWithAllThreads(); + _clientThreadPool = null; } - if(_userExceptionFactoryManager != null) - { - _userExceptionFactoryManager.destroy(); - _userExceptionFactoryManager = null; - } + _servantFactoryManager.destroy(); + _servantFactoryManager = null; - if(_referenceFactory != null) - { - _referenceFactory.destroy(); - _referenceFactory = null; - } + _userExceptionFactoryManager.destroy(); + _userExceptionFactoryManager = null; - if(_proxyFactory != null) - { - // No destroy function defined - // _proxyFactory.destroy(); - _proxyFactory = null; - } - - if(_routerManager != null) - { - _routerManager.destroy(); - _routerManager = null; - } - - if(_locatorManager != null) - { - _locatorManager.destroy(); - _locatorManager = null; - } - - if(_endpointFactoryManager != null) - { - _endpointFactoryManager.destroy(); - _endpointFactoryManager = null; - } - - if(_pluginManager != null) - { - _pluginManager.destroy(); - _pluginManager = null; - } + _referenceFactory.destroy(); + _referenceFactory = null; + + // No destroy function defined. + // _proxyFactory.destroy(); + _proxyFactory = null; + + _routerManager.destroy(); + _routerManager = null; + + _locatorManager.destroy(); + _locatorManager = null; + + _endpointFactoryManager.destroy(); + _endpointFactoryManager = null; + + _pluginManager.destroy(); + _pluginManager = null; + + _destroyed = true; } } diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java index bec9d78f9cb..518d16932bd 100644 --- a/java/src/IceInternal/ObjectAdapterFactory.java +++ b/java/src/IceInternal/ObjectAdapterFactory.java @@ -19,6 +19,15 @@ public final class ObjectAdapterFactory public synchronized void shutdown() { + // + // Ignore shutdown requests if the object adapter factory has + // already been shut down. + // + if(_instance == null) + { + return; + } + java.util.Iterator i = _adapters.values().iterator(); while(i.hasNext()) { @@ -26,12 +35,53 @@ public final class ObjectAdapterFactory adapter.deactivate(); } - _adapters.clear(); + _instance = null; + _communicator = null; + + notifyAll(); } + public synchronized void + waitForShutdown() + { + // + // First we wait for the shutdown of the factory itself. + // + while(_instance != null) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + // + // Now we wait for deactivation of each object adapter. + // + java.util.Iterator i = _adapters.values().iterator(); + while(i.hasNext()) + { + Ice.ObjectAdapter adapter = (Ice.ObjectAdapter)i.next(); + adapter.waitForDeactivate(); + } + + // + // We're done, now we can throw away the object adapters. + // + _adapters.clear(); + } + public synchronized Ice.ObjectAdapter createObjectAdapter(String name, String endpts, String id) { + if(_instance == null) + { + throw new Ice.CommunicatorDestroyedException(); + } + Ice.ObjectAdapter adapter = (Ice.ObjectAdapter)_adapters.get(name); if(adapter != null) { @@ -46,21 +96,33 @@ public final class ObjectAdapterFactory public synchronized Ice.ObjectAdapter findObjectAdapter(Ice.ObjectPrx proxy) { + if(_instance == null) + { + throw new Ice.CommunicatorDestroyedException(); + } + java.util.Iterator i = _adapters.values().iterator(); while(i.hasNext()) { Ice.ObjectAdapterI adapter = (Ice.ObjectAdapterI)i.next(); - if(adapter.isLocal(proxy)) - { - return adapter; - } - } + try + { + if(adapter.isLocal(proxy)) + { + return adapter; + } + } + catch(Ice.ObjectAdapterDeactivatedException ex) + { + // Ignore. + } + } return null; } // - // Only for use by Instance + // Only for use by Instance. // ObjectAdapterFactory(Instance instance, Ice.Communicator communicator) { @@ -68,6 +130,17 @@ public final class ObjectAdapterFactory _communicator = communicator; } + protected void + finalize() + throws Throwable + { + assert(_instance == null); + assert(_communicator == null); + assert(_adapters.size() == 0); + + super.finalize(); + } + private Instance _instance; private Ice.Communicator _communicator; private java.util.HashMap _adapters = new java.util.HashMap(); diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 09f0f317c29..2c4b5d0e6c3 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -16,6 +16,58 @@ package IceInternal; public class OutgoingConnectionFactory { + public synchronized void + destroy() + { + if(_instance == null) + { + return; + } + + java.util.Iterator p = _connections.values().iterator(); + while(p.hasNext()) + { + Connection connection = (Connection)p.next(); + connection.destroy(Connection.CommunicatorDestroyed); + } + + _instance = null; + } + + public synchronized void + waitUntilFinished() + { + // + // First we wait until the factory is destroyed. + // + while(_instance != null) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + // + // Now we wait for until the destruction of each connection is + // finished. + // + java.util.Iterator p = _connections.values().iterator(); + while(p.hasNext()) + { + Connection connection = (Connection)p.next(); + connection.waitUntilFinished(); + } + + // + // We're done, now we can throw away all connections. + // + _connections.clear(); + } + public synchronized Connection create(Endpoint[] endpoints) { @@ -26,14 +78,14 @@ public class OutgoingConnectionFactory assert(endpoints.length > 0); - // - // Reap destroyed connections. - // + // + // Reap connections for which destruction has completed. + // java.util.Iterator p = _connections.values().iterator(); while(p.hasNext()) { Connection connection = (Connection)p.next(); - if(connection.destroyed()) + if(connection.isFinished()) { p.remove(); } @@ -54,12 +106,19 @@ public class OutgoingConnectionFactory Connection connection = (Connection)_connections.get(endpoint); if(connection != null) { - return connection; + // + // Don't return connections for which destruction has + // been initiated. + // + if(!connection.isDestroyed()) + { + return connection; + } } } // - // No connections exist, try to create one + // No connections exist, try to create one. // TraceLevels traceLevels = _instance.traceLevels(); Ice.Logger logger = _instance.logger(); @@ -195,24 +254,6 @@ public class OutgoingConnectionFactory super.finalize(); } - public synchronized void - destroy() - { - if(_instance == null) - { - return; - } - - java.util.Iterator p = _connections.values().iterator(); - while(p.hasNext()) - { - Connection connection = (Connection)p.next(); - connection.destroy(Connection.CommunicatorDestroyed); - } - _connections.clear(); - _instance = null; - } - private Instance _instance; private java.util.HashMap _connections = new java.util.HashMap(); } diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 33b15884b42..4576880f122 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -29,11 +29,8 @@ public final class ThreadPool { if(TRACE_REGISTRATION) { - trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd + - ", handler count = " + (_handlers + 1)); + trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd); } - - ++_handlers; _changes.add(new FdHandlerPair(fd, handler)); setInterrupt(0); } @@ -88,41 +85,6 @@ public final class ThreadPool setInterrupt(1); } - public synchronized void - waitUntilFinished() - { - if(TRACE_SHUTDOWN) - { - trace("waiting until finished..."); - } - - while(_handlers != 0 && _threadNum != 0) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - if(_handlers != 0) - { - _instance.logger().error("can't wait for graceful application termination in thread pool\n" + - "since all threads have vanished"); - } - else - { - assert(_handlerMap.isEmpty()); - } - - if(TRACE_SHUTDOWN) - { - trace("finished."); - } - } - public void joinWithAllThreads() { @@ -155,7 +117,6 @@ public final class ThreadPool { _instance = instance; _destroyed = false; - _handlers = 0; _timeout = 0; _multipleThreads = false; _name = name; @@ -183,23 +144,24 @@ public final class ThreadPool // _keys = _selector.selectedKeys(); + int threadNum; if(server) { _timeout = _instance.properties().getPropertyAsInt("Ice.ServerIdleTime"); _timeoutMillis = _timeout * 1000; - _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10); + threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10); } else { - _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1); + threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1); } - if(_threadNum < 1) + if(threadNum < 1) { - _threadNum = 1; + threadNum = 1; } - if(_threadNum > 1) + if(threadNum > 1) { _multipleThreads = true; } @@ -216,8 +178,8 @@ public final class ThreadPool try { - _threads = new EventHandlerThread[_threadNum]; - for(int i = 0; i < _threadNum; i++) + _threads = new EventHandlerThread[threadNum]; + for(int i = 0; i < threadNum; i++) { _threads[i] = new EventHandlerThread(threadNamePrefix + _name + "-" + i); _threads[i].start(); @@ -387,8 +349,6 @@ public final class ThreadPool private void run(BasicStream stream) { - boolean shutdown = false; - while(true) { if(_multipleThreads) @@ -405,21 +365,6 @@ public final class ThreadPool while(true) { - if(shutdown) // Shutdown has been initiated. - { - if(TRACE_SHUTDOWN) - { - trace("shutdown detected"); - } - - shutdown = false; - ObjectAdapterFactory factory = _instance.objectAdapterFactory(); - if(factory != null) - { - factory.shutdown(); - } - } - if(TRACE_REGISTRATION) { java.util.Set keys = _selector.keys(); @@ -433,7 +378,7 @@ public final class ThreadPool } select(); - if(_keys.size() == 0) // Timeout. + if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout. { if(TRACE_SELECT) { @@ -443,12 +388,13 @@ public final class ThreadPool assert(_timeout > 0); _timeout = 0; _timeoutMillis = 0; - shutdown = true; + initiateShutdown(); continue repeatSelect; } EventHandler handler = null; boolean finished = false; + boolean shutdown = false; synchronized(this) { @@ -493,71 +439,68 @@ public final class ThreadPool shutdown = clearInterrupt(); - // - // Server shutdown? - // - if(shutdown) + if(!shutdown) { - continue repeatSelect; - } - - // - // An event handler must have been registered or - // unregistered. - // - assert(!_changes.isEmpty()); - FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); - - if(change.handler != null) // Addition if handler is set. - { - int op; - if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) - { - op = java.nio.channels.SelectionKey.OP_READ; - } - else - { - op = java.nio.channels.SelectionKey.OP_ACCEPT; - } - - java.nio.channels.SelectionKey key = null; - try - { - key = change.fd.register(_selector, op, change.handler); - } - catch(java.nio.channels.ClosedChannelException ex) - { - assert(false); - } - _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); - - if(TRACE_REGISTRATION) - { - trace("added handler (" + change.handler.getClass().getName() + ") for fd " + - change.fd); - } - - continue repeatSelect; - } - else // Removal if handler is not set. - { - HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd); - assert(pair != null); - handler = pair.handler; - finished = true; - pair.key.cancel(); - - if(TRACE_REGISTRATION) - { - trace("removed handler (" + handler.getClass().getName() + ") for fd " + change.fd); - } - - // Don't goto repeatSelect; we have to call - // finished() on the event handler below, outside - // the thread synchronization. - } - } - else + // + // An event handler must have been + // registered or unregistered. + // + assert(!_changes.isEmpty()); + FdHandlerPair change = (FdHandlerPair)_changes.removeFirst(); + + if(change.handler != null) // Addition if handler is set. + { + int op; + if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) + { + op = java.nio.channels.SelectionKey.OP_READ; + } + else + { + op = java.nio.channels.SelectionKey.OP_ACCEPT; + } + + java.nio.channels.SelectionKey key = null; + try + { + key = change.fd.register(_selector, op, change.handler); + } + catch(java.nio.channels.ClosedChannelException ex) + { + assert(false); + } + _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); + + if(TRACE_REGISTRATION) + { + trace("added handler (" + change.handler.getClass().getName() + ") for fd " + + change.fd); + } + + continue repeatSelect; + } + else // Removal if handler is not set. + { + HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd); + assert(pair != null); + handler = pair.handler; + finished = true; + pair.key.cancel(); + + if(TRACE_REGISTRATION) + { + trace("removed handler (" + handler.getClass().getName() + ") for fd " + + change.fd); + } + + // Don't goto repeatSelect; we have to + // call finished() on the event + // handler below, outside the thread + // synchronization. + } + } + } + else { java.nio.channels.SelectionKey key = null; java.util.Iterator iter = _keys.iterator(); @@ -594,80 +537,92 @@ public final class ThreadPool } } - assert(handler != null); + assert(handler != null || shutdown); - if(finished) + if(shutdown) // Shutdown has been initiated. { - // - // Notify a handler about it's removal from the thread - // pool. - // - try - { - handler.finished(this); - } - catch(Ice.LocalException ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception while calling finished():\n" + sw.toString() + "\n" + handler.toString(); - _instance.logger().error(s); - } - - synchronized(this) + if(TRACE_SHUTDOWN) { - assert(_handlers > 0); - if(--_handlers == 0) - { - notifyAll(); // For waitUntilFinished(). - } + trace("shutdown detected"); } - } - else - { - // - // If the handler is "readable", try to read a message. - // - try - { - if(handler.readable()) - { - try - { - read(handler); - } - catch(Ice.TimeoutException ex) // Expected - { - continue repeatSelect; - } - catch(Ice.LocalException ex) - { - if(TRACE_EXCEPTION) - { - trace("informing handler (" + handler.getClass().getName() + - ") about exception " + ex); - ex.printStackTrace(); - } - - handler.exception(ex); - continue repeatSelect; - } - stream.swap(handler._stream); - assert(stream.pos() == stream.size()); - } + ObjectAdapterFactory factory = _instance.objectAdapterFactory(); + if(factory == null) + { + continue repeatSelect; + } - handler.message(stream, this); - } - finally - { - stream.reset(); - } + promoteFollower(); + factory.shutdown(); } - - break; // inner while loop + else + { + if(finished) + { + // + // Notify a handler about it's removal from + // the thread pool. + // + try + { + handler.finished(this); + } + catch(Ice.LocalException ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "exception while calling finished():\n" + sw.toString() + "\n" + + handler.toString(); + _instance.logger().error(s); + } + } + else + { + // + // If the handler is "readable", try to read a + // message. + // + try + { + if(handler.readable()) + { + try + { + read(handler); + } + catch(Ice.TimeoutException ex) // Expected. + { + continue repeatSelect; + } + catch(Ice.LocalException ex) + { + if(TRACE_EXCEPTION) + { + trace("informing handler (" + handler.getClass().getName() + + ") about exception " + ex); + ex.printStackTrace(); + } + + handler.exception(ex); + continue repeatSelect; + } + + stream.swap(handler._stream); + assert(stream.pos() == stream.size()); + } + + handler.message(stream, this); + } + finally + { + stream.reset(); + } + } + } + + break; // Inner while loop. } } } @@ -890,7 +845,6 @@ public final class ThreadPool private java.util.Set _keys; private java.util.LinkedList _changes = new java.util.LinkedList(); private java.util.HashMap _handlerMap = new java.util.HashMap(); - private int _handlers; private int _timeout; private int _timeoutMillis; private RecursiveMutex _threadMutex = new RecursiveMutex(); @@ -932,25 +886,6 @@ public final class ThreadPool _instance.logger().error(s); } - synchronized(ThreadPool.this) - { - --_threadNum; - assert(_threadNum >= 0); - - // - // The notifyAll() shouldn't be needed, *except* if one of the - // threads exits because of an exception. (Which is an error - // condition in Ice and if it happens needs to be debugged.) - // However, I call notifyAll() anyway, in all cases, using a - // "defensive" programming approach when it comes to - // multithreading. - // - if(_threadNum == 0) - { - ThreadPool.this.notifyAll(); // For waitUntil...Finished() methods. - } - } - if(TRACE_THREAD) { trace("run() terminated - promoting follower"); @@ -962,5 +897,4 @@ public final class ThreadPool } } private EventHandlerThread[] _threads; - private int _threadNum; // Number of running threads } diff --git a/java/src/IceInternal/TraceUtil.java b/java/src/IceInternal/TraceUtil.java index 9b7b842fa7c..509d8cd324b 100644 --- a/java/src/IceInternal/TraceUtil.java +++ b/java/src/IceInternal/TraceUtil.java @@ -71,11 +71,12 @@ final class TraceUtil s.write(heading); printHeader(s, str); - int cnt = 0; - while(str.pos() != str.size()) + int batchRequestNum = str.readInt(); + s.write("\nnumber of requests = " + batchRequestNum); + + for(int i = 0; i < batchRequestNum; ++i) { - s.write("\nrequest #" + cnt + ':'); - cnt++; + s.write("\nrequest #" + i + ':'); printRequestHeader(s, str); str.skipEncaps(); } |