summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/CommunicatorI.java28
-rw-r--r--java/src/Ice/ObjectAdapterI.java388
-rw-r--r--java/src/Ice/_ObjectDelM.java8
-rw-r--r--java/src/IceInternal/Connection.java884
-rw-r--r--java/src/IceInternal/Direct.java14
-rw-r--r--java/src/IceInternal/Incoming.java168
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java161
-rw-r--r--java/src/IceInternal/Instance.java138
-rw-r--r--java/src/IceInternal/ObjectAdapterFactory.java87
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java89
-rw-r--r--java/src/IceInternal/ThreadPool.java370
-rw-r--r--java/src/IceInternal/TraceUtil.java9
12 files changed, 1180 insertions, 1164 deletions
diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java
index 5fc21f804ba..16602e18a28 100644
--- a/java/src/Ice/CommunicatorI.java
+++ b/java/src/Ice/CommunicatorI.java
@@ -22,11 +22,8 @@ class CommunicatorI extends LocalObjectImpl implements Communicator
if(!_destroyed) // Don't destroy twice.
{
_destroyed = true;
-
- _instance.objectAdapterFactory().shutdown();
- _instance.destroy();
-
_serverThreadPool = null;
+ _instance.destroy();
}
}
@@ -45,14 +42,23 @@ class CommunicatorI extends LocalObjectImpl implements Communicator
public void
waitForShutdown()
{
- //
- // No mutex locking here, otherwise the communicator is blocked
- // while waiting for shutdown.
- //
- if(_serverThreadPool != null)
+ IceInternal.ObjectAdapterFactory objectAdapterFactory;
+
+ synchronized(this)
{
- _serverThreadPool.waitUntilFinished();
+ if(_destroyed)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ objectAdapterFactory = _instance.objectAdapterFactory();
}
+
+ //
+ // We must call waitForShutdown on the object adapter factory
+ // outside the synchronization, otherwise the communicator is
+ // blocked while we wait for shutdown.
+ //
+ objectAdapterFactory.waitForShutdown();
}
public synchronized Ice.ObjectPrx
@@ -281,7 +287,7 @@ class CommunicatorI extends LocalObjectImpl implements Communicator
// We need _serverThreadPool directly in CommunicatorI. That's
// because the shutdown() operation is signal-safe, and thus must
// not access any mutex locks or _instance. It may only access
- // _serverThreadPool->initiateShutdown(), which is signal-safe as
+ // _serverThreadPool.initiateShutdown(), which is signal-safe as
// well.
//
private IceInternal.ThreadPool _serverThreadPool;
diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java
index af1f959e361..adbf3faf573 100644
--- a/java/src/Ice/ObjectAdapterI.java
+++ b/java/src/Ice/ObjectAdapterI.java
@@ -19,18 +19,16 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public String
getName()
{
- return _name; // _name is immutable
+ //
+ // No mutex lock necessary, _name is immutable.
+ //
+ return _name;
}
public synchronized Communicator
getCommunicator()
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
+ checkForDeactivation();
return _communicator;
}
@@ -38,13 +36,8 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized void
activate()
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
-
+ checkForDeactivation();
+
if(!_printAdapterReadyDone)
{
if(_locatorInfo != null && _id.length() > 0)
@@ -101,13 +94,8 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized void
hold()
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
-
+ checkForDeactivation();
+
final int sz = _incomingConnectionFactories.size();
for(int i = 0; i < sz; ++i)
{
@@ -120,72 +108,119 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized void
waitForHold()
{
- // TODO: Not implemented yet.
+ checkForDeactivation();
+
+ final int sz = _incomingConnectionFactories.size();
+ for(int i = 0; i < sz; ++i)
+ {
+ IceInternal.IncomingConnectionFactory factory =
+ (IceInternal.IncomingConnectionFactory)_incomingConnectionFactories.get(i);
+ factory.waitUntilHolding();
+ }
}
- public void
+ public synchronized void
deactivate()
{
- synchronized(this)
+ //
+ // Ignore deactivation requests if the object adapter has
+ // already been deactivated.
+ //
+ if(_instance == null)
{
- if(_instance == null)
- {
- //
- // Ignore deactivation requests if the Object Adapter has
- // already been deactivated.
- //
- return;
- }
-
- final int sz = _incomingConnectionFactories.size();
- for(int i = 0; i < sz; ++i)
- {
- IceInternal.IncomingConnectionFactory factory =
- (IceInternal.IncomingConnectionFactory)_incomingConnectionFactories.get(i);
- factory.destroy();
- }
- _incomingConnectionFactories.clear();
-
- _instance.outgoingConnectionFactory().removeAdapter(this);
-
- _instance = null;
- _communicator = null;
+ return;
}
-
- decUsageCount();
+
+ final int sz = _incomingConnectionFactories.size();
+ for(int i = 0; i < sz; ++i)
+ {
+ IceInternal.IncomingConnectionFactory factory =
+ (IceInternal.IncomingConnectionFactory)_incomingConnectionFactories.get(i);
+ factory.destroy();
+ }
+
+ _instance.outgoingConnectionFactory().removeAdapter(this);
+
+ _instance = null;
+ _communicator = null;
+
+ notifyAll();
}
public synchronized void
waitForDeactivate()
{
- assert(_usageCount >= 0);
-
- while(_usageCount > 0)
+ //
+ // First we wait for deactivation of the adapter itself, and
+ // for the return of all direct method calls using this
+ // adapter.
+ //
+ while(_instance != null || _directCount > 0)
{
try
{
wait();
}
- catch(java.lang.InterruptedException ex)
+ catch(InterruptedException ex)
{
}
}
- assert(_usageCount == 0);
+ //
+ // Now we wait for until all incoming connection factories are
+ // finished.
+ //
+ final int sz = _incomingConnectionFactories.size();
+ for(int i = 0; i < sz; ++i)
+ {
+ IceInternal.IncomingConnectionFactory factory =
+ (IceInternal.IncomingConnectionFactory)_incomingConnectionFactories.get(i);
+ factory.waitUntilFinished();
+ }
+
+ //
+ // We're done, now we can throw away all incoming connection
+ // factories.
+ //
+ _incomingConnectionFactories.clear();
+
+ //
+ // Now it's also time to clean up the active servant map.
+ //
+ _activeServantMap.clear();
+
+ //
+ // And the servant locators, too.
+ //
+ java.util.Iterator p = _locatorMap.entrySet().iterator();
+ while(p.hasNext())
+ {
+ java.util.Map.Entry e = (java.util.Map.Entry)p.next();
+ ServantLocator locator = (ServantLocator)e.getValue();
+ try
+ {
+ locator.deactivate();
+ }
+ catch(RuntimeException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "exception during locator deactivation:\n" + "object adapter: `" + _name + "'\n" +
+ "locator prefix: `" + e.getKey() + "'\n" + sw.toString();
+ _logger.error(s);
+ }
+ }
+ _locatorMap.clear();
}
public synchronized ObjectPrx
add(Ice.Object servant, Identity ident)
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
-
- checkIdentity(ident);
-
+ checkForDeactivation();
+ checkIdentity(ident);
+
Ice.Object o = (Ice.Object)_activeServantMap.get(ident);
if(o != null)
{
@@ -210,12 +245,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized ObjectPrx
addWithUUID(Ice.Object servant)
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
+ checkForDeactivation();
long now = System.currentTimeMillis();
Identity ident = new Identity();
@@ -230,13 +260,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized void
remove(Identity ident)
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
-
+ checkForDeactivation();
checkIdentity(ident);
Ice.Object o = (Ice.Object)_activeServantMap.get(ident);
@@ -254,12 +278,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized void
addServantLocator(ServantLocator locator, String prefix)
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
+ checkForDeactivation();
ServantLocator l = (ServantLocator)_locatorMap.get(prefix);
if(l != null)
@@ -276,12 +295,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized void
removeServantLocator(String prefix)
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
+ checkForDeactivation();
ServantLocator l = (ServantLocator)_locatorMap.get(prefix);
if(l == null)
@@ -302,20 +316,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized ServantLocator
findServantLocator(String prefix)
{
- //
- // Don't check whether deactivation has been initiated. This
- // operation might be called (e.g., from Incoming or Direct)
- // after deactivation has been initiated, but before
- // deactivation has been completed.
- //
- /*
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
- */
+ checkForDeactivation();
return (ServantLocator)_locatorMap.get(prefix);
}
@@ -323,20 +324,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized Ice.Object
identityToServant(Identity ident)
{
- //
- // Don't check whether deactivation has been initiated. This
- // operation might be called (e.g., from Incoming or Direct)
- // after deactivation has been initiated, but before
- // deactivation has been completed.
- //
- /*
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
- */
+ checkForDeactivation();
//
// Don't call checkIdentity. We simply want null to be
@@ -360,13 +348,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized ObjectPrx
createProxy(Identity ident)
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
-
+ checkForDeactivation();
checkIdentity(ident);
return newProxy(ident);
@@ -375,13 +357,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized ObjectPrx
createDirectProxy(Identity ident)
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
-
+ checkForDeactivation();
checkIdentity(ident);
return newDirectProxy(ident);
@@ -390,13 +366,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized ObjectPrx
createReverseProxy(Identity ident)
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
-
+ checkForDeactivation();
checkIdentity(ident);
//
@@ -413,12 +383,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized void
addRouter(RouterPrx router)
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
+ checkForDeactivation();
IceInternal.RouterInfo routerInfo = _instance.routerManager().get(router);
if(routerInfo != null)
@@ -463,12 +428,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized void
setLocator(LocatorPrx locator)
{
- if(_instance == null)
- {
- ObjectAdapterDeactivatedException e = new ObjectAdapterDeactivatedException();
- e.name = _name;
- throw e;
- }
+ checkForDeactivation();
_locatorInfo = _instance.locatorManager().get(locator);
}
@@ -476,10 +436,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
public synchronized IceInternal.Connection[]
getIncomingConnections()
{
- if(_instance == null)
- {
- throw new ObjectAdapterDeactivatedException();
- }
+ checkForDeactivation();
java.util.ArrayList connections = new java.util.ArrayList();
final int sz = _incomingConnectionFactories.size();
@@ -499,58 +456,28 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
}
public synchronized void
- incUsageCount()
+ incDirectCount()
{
- if(_instance == null)
- {
- throw new ObjectAdapterDeactivatedException();
- }
+ checkForDeactivation();
- assert(_usageCount >= 0);
- ++_usageCount;
+ assert(_directCount >= 0);
+ ++_directCount;
}
public synchronized void
- decUsageCount()
+ decDirectCount()
{
//
// The object adapter may already have been deactivated when
- // the usage count is decremented, thus there is no check for
+ // the direct count is decremented, thus there is no check for
// prior deactivation.
//
- assert(_usageCount > 0);
- --_usageCount;
-
- if(_usageCount == 0)
+ assert(_directCount > 0);
+ if(--_directCount == 0)
{
- _activeServantMap.clear();
-
- java.util.Iterator p = _locatorMap.entrySet().iterator();
- while(p.hasNext())
- {
- java.util.Map.Entry e = (java.util.Map.Entry)p.next();
- ServantLocator locator = (ServantLocator)e.getValue();
- try
- {
- locator.deactivate();
- }
- catch(RuntimeException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception during locator deactivation:\n" + "object adapter: `" + _name + "'\n" +
- "locator prefix: `" + e.getKey() + "'\n" + sw.toString();
- _logger.error(s);
- }
- }
-
- _locatorMap.clear();
-
notifyAll();
- }
+ }
}
//
@@ -565,7 +492,7 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
_name = name;
_id = id;
_logger = instance.logger();
- _usageCount = 1;
+ _directCount = 0;
String s = endpts.toLowerCase();
@@ -616,12 +543,18 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
finalize()
throws Throwable
{
- assert(_usageCount == 0);
-
if(_instance != null)
{
- _instance.logger().warning("object adapter has not been deactivated");
+ _instance.logger().warning("object adapter `" + _name + "' has not been deactivated");
}
+ else
+ {
+ assert(_communicator == null);
+ assert(_incomingConnectionFactories.size() == 0);
+ assert(_activeServantMap.size() == 0);
+ assert(_locatorMap.size() == 0);
+ assert(_directCount == 0);
+ }
super.finalize();
}
@@ -629,6 +562,8 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
private ObjectPrx
newProxy(Identity ident)
{
+ checkForDeactivation();
+
if(_id.length() == 0)
{
return newDirectProxy(ident);
@@ -651,6 +586,8 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
private ObjectPrx
newDirectProxy(Identity ident)
{
+ checkForDeactivation();
+
IceInternal.Endpoint[] endpoints =
new IceInternal.Endpoint[_incomingConnectionFactories.size() + _routerEndpoints.size()];
@@ -687,32 +624,11 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
return _instance.proxyFactory().referenceToProxy(reference);
}
- private static void
- checkIdentity(Identity ident)
- {
- if(ident.name == null || ident.name.length() == 0)
- {
- IllegalIdentityException e = new IllegalIdentityException();
- try
- {
- e.id = (Identity)ident.clone();
- }
- catch(CloneNotSupportedException ex)
- {
- assert(false);
- }
- throw e;
- }
-
- if(ident.category == null)
- {
- ident.category = "";
- }
- }
-
public boolean
isLocal(ObjectPrx proxy)
{
+ checkForDeactivation();
+
IceInternal.Reference ref = ((ObjectPrxHelper)proxy).__reference();
final IceInternal.Endpoint[] endpoints = ref.endpoints;
@@ -771,6 +687,40 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
return false;
}
+ private void
+ checkForDeactivation()
+ {
+ if(_instance == null)
+ {
+ ObjectAdapterDeactivatedException ex = new ObjectAdapterDeactivatedException();
+ ex.name = _name;
+ throw ex;
+ }
+ }
+
+ private static void
+ checkIdentity(Identity ident)
+ {
+ if(ident.name == null || ident.name.length() == 0)
+ {
+ IllegalIdentityException e = new IllegalIdentityException();
+ try
+ {
+ e.id = (Identity)ident.clone();
+ }
+ catch(CloneNotSupportedException ex)
+ {
+ assert(false);
+ }
+ throw e;
+ }
+
+ if(ident.category == null)
+ {
+ ident.category = "";
+ }
+ }
+
private IceInternal.Instance _instance;
private Communicator _communicator;
private boolean _printAdapterReadyDone;
@@ -782,5 +732,5 @@ public class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapter
private java.util.ArrayList _incomingConnectionFactories = new java.util.ArrayList();
private java.util.ArrayList _routerEndpoints = new java.util.ArrayList();
private IceInternal.LocatorInfo _locatorInfo;
- private int _usageCount;
+ private int _directCount;
}
diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java
index 12aabfcd57a..37c7dc17119 100644
--- a/java/src/Ice/_ObjectDelM.java
+++ b/java/src/Ice/_ObjectDelM.java
@@ -164,7 +164,7 @@ public class _ObjectDelM implements _ObjectDel
__reference = from.__reference;
__connection = from.__connection;
- __connection.incUsageCount();
+ __connection.incProxyCount();
}
protected IceInternal.Connection __connection;
@@ -212,7 +212,7 @@ public class _ObjectDelM implements _ObjectDel
}
assert(j < connections.length);
__connection = connections[j];
- __connection.incUsageCount();
+ __connection.incProxyCount();
}
else
{
@@ -257,7 +257,7 @@ public class _ObjectDelM implements _ObjectDel
IceInternal.OutgoingConnectionFactory factory = __reference.instance.outgoingConnectionFactory();
__connection = factory.create(filteredEndpoints);
assert(__connection != null);
- __connection.incUsageCount();
+ __connection.incProxyCount();
}
catch (LocalException ex)
{
@@ -453,7 +453,7 @@ public class _ObjectDelM implements _ObjectDel
{
if(__connection != null)
{
- __connection.decUsageCount();
+ __connection.decProxyCount();
}
while(__outgoingCache != null)
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java
index 6b89ecc732b..3dc23113118 100644
--- a/java/src/IceInternal/Connection.java
+++ b/java/src/IceInternal/Connection.java
@@ -16,160 +16,173 @@ package IceInternal;
public final class Connection extends EventHandler
{
- public boolean
- destroyed()
+ public synchronized void
+ activate()
{
- _mutex.lock();
- try
- {
- return _state >= StateClosing;
- }
- finally
- {
- _mutex.unlock();
- }
+ setState(StateActive);
}
- public void
- validate()
+ public synchronized void
+ hold()
{
- _mutex.lock();
- try
- {
- if(_endpoint.datagram())
+ setState(StateHolding);
+ }
+
+ // DestructionReason
+ public final static int ObjectAdapterDeactivated = 0;
+ public final static int CommunicatorDestroyed = 1;
+
+ public synchronized void
+ destroy(int reason)
+ {
+ _batchStream.destroy();
+
+ switch(reason)
+ {
+ case ObjectAdapterDeactivated:
{
- //
- // Datagram connections are always implicitly validated.
- //
- return;
+ setState(StateClosing, new Ice.ObjectAdapterDeactivatedException());
+ break;
}
-
- try
+
+ case CommunicatorDestroyed:
{
- if(_adapter != null)
- {
- //
- // Incoming connections play the active role with
- // respect to connection validation.
- //
- BasicStream os = new BasicStream(_instance);
- os.writeByte(Protocol.protocolVersion);
- os.writeByte(Protocol.encodingVersion);
- os.writeByte(Protocol.validateConnectionMsg);
- os.writeInt(Protocol.headerSize); // Message size.
- TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
- }
- else
- {
- //
- // Outgoing connection play the passive role with
- // respect to connection validation.
- //
- BasicStream is = new BasicStream(_instance);
- is.resize(Protocol.headerSize, true);
- is.pos(0);
- _transceiver.read(is, _endpoint.timeout());
- int pos = is.pos();
- assert(pos >= Protocol.headerSize);
- is.pos(0);
- byte protVer = is.readByte();
- if(protVer != Protocol.protocolVersion)
- {
- throw new Ice.UnsupportedProtocolException();
- }
- byte encVer = is.readByte();
- if(encVer != Protocol.encodingVersion)
- {
- throw new Ice.UnsupportedEncodingException();
- }
- byte messageType = is.readByte();
- if(messageType != Protocol.validateConnectionMsg)
- {
- throw new Ice.ConnectionNotValidatedException();
- }
- int size = is.readInt();
- if(size != Protocol.headerSize)
- {
- throw new Ice.IllegalMessageSizeException();
- }
- TraceUtil.traceHeader("received validate connection", is, _logger, _traceLevels);
- }
+ setState(StateClosing, new Ice.CommunicatorDestroyedException());
+ break;
}
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert(_exception != null);
- throw _exception;
- }
}
- finally
- {
- _mutex.unlock();
- }
}
- public void
- hold()
+ public synchronized boolean
+ isDestroyed()
{
- _mutex.lock();
- try
- {
- setState(StateHolding);
- }
- finally
- {
- _mutex.unlock();
- }
+ return _state >= StateClosing;
}
- public void
- activate()
+ public synchronized boolean
+ isFinished()
{
- _mutex.lock();
- try
- {
- setState(StateActive);
- }
- finally
- {
- _mutex.unlock();
+ return _transceiver == null;
+ }
+
+ public synchronized void
+ waitUntilHolding()
+ {
+ while(_state < StateHolding || _dispatchCount > 0)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
}
}
- public void
- incUsageCount()
+ public synchronized void
+ waitUntilFinished()
{
- _mutex.lock();
- try
- {
- assert(_usageCount >= 0);
- ++_usageCount;
+ while(_transceiver != null)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
}
- finally
- {
- _mutex.unlock();
- }
}
- public void
- decUsageCount()
+ public synchronized void
+ validate()
{
- _mutex.lock();
- try
- {
- assert(_usageCount > 0);
- --_usageCount;
- if(_usageCount == 0 && _adapter == null)
+ if(_endpoint.datagram())
+ {
+ //
+ // Datagram connections are always implicitly validated.
+ //
+ return;
+ }
+
+ try
+ {
+ if(_adapter != null)
+ {
+ //
+ // Incoming connections play the active role with
+ // respect to connection validation.
+ //
+ BasicStream os = new BasicStream(_instance);
+ os.writeByte(Protocol.protocolVersion);
+ os.writeByte(Protocol.encodingVersion);
+ os.writeByte(Protocol.validateConnectionMsg);
+ os.writeInt(Protocol.headerSize); // Message size.
+ TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+ }
+ else
{
- assert(_requests.isEmpty());
- setState(StateClosing, new Ice.CloseConnectionException());
+ //
+ // Outgoing connection play the passive role with
+ // respect to connection validation.
+ //
+ BasicStream is = new BasicStream(_instance);
+ is.resize(Protocol.headerSize, true);
+ is.pos(0);
+ _transceiver.read(is, _endpoint.timeout());
+ int pos = is.pos();
+ assert(pos >= Protocol.headerSize);
+ is.pos(0);
+ byte protVer = is.readByte();
+ if(protVer != Protocol.protocolVersion)
+ {
+ throw new Ice.UnsupportedProtocolException();
+ }
+ byte encVer = is.readByte();
+ if(encVer != Protocol.encodingVersion)
+ {
+ throw new Ice.UnsupportedEncodingException();
+ }
+ byte messageType = is.readByte();
+ if(messageType != Protocol.validateConnectionMsg)
+ {
+ throw new Ice.ConnectionNotValidatedException();
+ }
+ int size = is.readInt();
+ if(size != Protocol.headerSize)
+ {
+ throw new Ice.IllegalMessageSizeException();
+ }
+ TraceUtil.traceHeader("received validate connection", is, _logger, _traceLevels);
}
}
- finally
- {
- _mutex.unlock();
- }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception != null);
+ throw _exception;
+ }
+ }
+
+ public synchronized void
+ incProxyCount()
+ {
+ assert(_proxyCount >= 0);
+ ++_proxyCount;
+ }
+
+ public synchronized void
+ decProxyCount()
+ {
+ assert(_proxyCount > 0);
+ --_proxyCount;
+ if(_proxyCount == 0 && _adapter == null)
+ {
+ assert(_requests.isEmpty());
+ setState(StateClosing, new Ice.CloseConnectionException());
+ }
}
private final static byte[] _requestHdr =
@@ -187,62 +200,58 @@ public final class Connection extends EventHandler
os.writeBlob(_requestHdr);
}
- public void
+ public synchronized void
sendRequest(Outgoing out, boolean oneway)
{
- _mutex.lock();
- try
- {
- if(_exception != null)
- {
- throw _exception;
- }
- assert(_state < StateClosing);
-
- int requestId = 0;
-
- try
- {
- BasicStream os = out.os();
- os.pos(3);
-
- //
- // Fill in the message size and request ID.
- //
- os.writeInt(os.size());
- if(!_endpoint.datagram() && !oneway)
- {
- requestId = _nextRequestId++;
- if(requestId <= 0)
- {
- _nextRequestId = 1;
- requestId = _nextRequestId++;
- }
- os.writeInt(requestId);
- }
- TraceUtil.traceRequest("sending request", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert(_exception != null);
- throw _exception;
- }
-
- //
- // Only add to the request map if there was no exception, and if
- // the operation is not oneway.
- //
- if(!_endpoint.datagram() && !oneway)
- {
- _requests.put(requestId, out);
- }
- }
- finally
- {
- _mutex.unlock();
- }
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+ assert(_state < StateClosing);
+
+ int requestId = 0;
+
+ try
+ {
+ BasicStream os = out.os();
+ os.pos(3);
+
+ //
+ // Fill in the message size and request ID.
+ //
+ os.writeInt(os.size());
+ if(!_endpoint.datagram() && !oneway)
+ {
+ requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+ os.writeInt(requestId);
+ }
+
+ //
+ // Send the request.
+ //
+ TraceUtil.traceRequest("sending request", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception != null);
+ throw _exception;
+ }
+
+ //
+ // Only add to the request map if there was no exception, and if
+ // the operation is not oneway.
+ //
+ if(!_endpoint.datagram() && !oneway)
+ {
+ _requests.put(requestId, out);
+ }
}
private final static byte[] _requestBatchHdr =
@@ -253,14 +262,22 @@ public final class Connection extends EventHandler
(byte)0, (byte)0, (byte)0, (byte)0 // Message size (placeholder).
};
- public void
+ public synchronized void
prepareBatchRequest(BasicStream os)
{
- _mutex.lock();
+ while(_batchStreamInUse && _exception == null)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
if(_exception != null)
{
- _mutex.unlock();
throw _exception;
}
assert(_state < StateClosing);
@@ -277,77 +294,165 @@ public final class Connection extends EventHandler
//
// Give the batch stream to caller, until finishBatchRequest()
- // is called.
+ // or abortBatchRequest() is called.
//
- _batchStream.swap(os);
+ _batchStreamInUse = true;
}
- public void
+ public synchronized void
finishBatchRequest(BasicStream os)
{
+ assert(_batchStreamInUse);
+
if(_exception != null)
{
- _mutex.unlock();
throw _exception;
}
assert(_state < StateClosing);
_batchStream.swap(os); // Get the batch stream back.
- _mutex.unlock(); // Give the Connection back.
+ ++_batchRequestNum; // Increment the number of requests in the batch.
+
+ //
+ // Give the Connection back.
+ //
+ _batchStreamInUse = false;
+ notifyAll();
}
- public void
+ public synchronized void
abortBatchRequest()
{
+ assert(_batchStreamInUse);
+
setState(StateClosed, new Ice.AbortBatchRequestException());
- _mutex.unlock(); // Give the Connection back.
+
+ //
+ // Give the Connection back.
+ //
+ _batchStreamInUse = false;
+ notifyAll();
}
- public void
+ public synchronized void
flushBatchRequest()
{
- _mutex.lock();
- try
- {
- if(_exception != null)
- {
- throw _exception;
- }
- assert(_state < StateClosing);
-
- try
- {
- if(_batchStream.size() == 0)
- {
- return; // Nothing to send.
- }
+ while(_batchStreamInUse && _exception == null)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
- _batchStream.pos(3);
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+ assert(_state < StateClosing);
+
+ try
+ {
+ if(_batchStream.size() == 0)
+ {
+ return; // Nothing to send.
+ }
+
+ _batchStream.pos(3);
+
+ //
+ // Fill in the message size the number of requests in the batch.
+ //
+ _batchStream.writeInt(_batchStream.size());
+ _batchStream.writeInt(_batchRequestNum);
+
+ //
+ // Send the batch request.
+ //
+ TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ _transceiver.write(_batchStream, _endpoint.timeout());
+
+ //
+ // Reset _batchStream so that new batch messages can be sent.
+ //
+ _batchStream.destroy();
+ _batchStream = new BasicStream(_instance);
+ _batchRequestNum = 0;
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception != null);
+ throw _exception;
+ }
+ }
- //
- // Fill in the message size.
- //
- _batchStream.writeInt(_batchStream.size());
- TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
- _transceiver.write(_batchStream, _endpoint.timeout());
+ public synchronized void
+ sendResponse(BasicStream os)
+ {
+ try
+ {
+ if(_state == StateClosed)
+ {
+ return;
+ }
+
+ //
+ // Fill in the message size.
+ //
+ os.pos(3);
+ final int sz = os.size();
+ os.writeInt(sz);
+
+ //
+ // Send the reply.
+ //
+ TraceUtil.traceReply("sending reply", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
+ }
- //
- // Reset _batchStream so that new batch messages can be sent.
- //
- _batchStream.destroy();
- _batchStream = new BasicStream(_instance);
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert(_exception != null);
- throw _exception;
- }
- }
- finally
- {
- _mutex.unlock();
- }
+ public synchronized void
+ sendNoResponse()
+ {
+ try
+ {
+ if(_state == StateClosed)
+ {
+ return;
+ }
+
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
}
public int
@@ -364,56 +469,40 @@ public final class Connection extends EventHandler
return _endpoint;
}
- public void
+ public synchronized void
setAdapter(Ice.ObjectAdapter adapter)
{
- _mutex.lock();
- try
- {
- //
- // We are registered with a thread pool in active and closing
- // mode. However, we only change subscription if we're in active
- // mode, and thus ignore closing mode here.
- //
- if(_state == StateActive)
- {
- if(adapter != null && _adapter == null)
- {
- //
- // Client is now server.
- //
- unregisterWithPool();
- }
-
- if(adapter == null && _adapter != null)
- {
- //
- // Server is now client.
- //
- unregisterWithPool();
- }
- }
-
- _adapter = adapter;
- }
- finally
- {
- _mutex.unlock();
- }
+ //
+ // We are registered with a thread pool in active and closing
+ // mode. However, we only change subscription if we're in active
+ // mode, and thus ignore closing mode here.
+ //
+ if(_state == StateActive)
+ {
+ if(adapter != null && _adapter == null)
+ {
+ //
+ // Client is now server.
+ //
+ unregisterWithPool();
+ }
+
+ if(adapter == null && _adapter != null)
+ {
+ //
+ // Server is now client.
+ //
+ unregisterWithPool();
+ }
+ }
+
+ _adapter = adapter;
}
- public Ice.ObjectAdapter
+ public synchronized Ice.ObjectAdapter
getAdapter()
{
- _mutex.lock();
- try
- {
- return _adapter;
- }
- finally
- {
- _mutex.unlock();
- }
+ return _adapter;
}
//
@@ -442,11 +531,10 @@ public final class Connection extends EventHandler
public void
message(BasicStream stream, ThreadPool threadPool)
{
- Incoming in = null;
- boolean batch = false;
+ int invoke = 0;
+ int requestId = 0;
- _mutex.lock();
- try
+ synchronized(this)
{
threadPool.promoteFollower();
@@ -483,7 +571,9 @@ public final class Connection extends EventHandler
else
{
TraceUtil.traceRequest("received request", stream, _logger, _traceLevels);
- in = getIncoming();
+ requestId = stream.readInt();
+ invoke = 1;
+ ++_dispatchCount;
}
break;
}
@@ -499,8 +589,12 @@ public final class Connection extends EventHandler
else
{
TraceUtil.traceBatchRequest("received batch request", stream, _logger, _traceLevels);
- in = getIncoming();
- batch = true;
+ invoke = stream.readInt();
+ if(invoke < 0)
+ {
+ throw new Ice.NegativeSizeException();
+ }
+ _dispatchCount += invoke;
}
break;
}
@@ -508,7 +602,7 @@ public final class Connection extends EventHandler
case Protocol.replyMsg:
{
TraceUtil.traceReply("received reply", stream, _logger, _traceLevels);
- int requestId = stream.readInt();
+ requestId = stream.readInt();
Outgoing out = (Outgoing)_requests.remove(requestId);
if(out == null)
{
@@ -562,111 +656,58 @@ public final class Connection extends EventHandler
return;
}
}
- finally
- {
- _mutex.unlock();
- }
//
// Method invocation must be done outside the thread
// synchronization, so that nested calls are possible.
//
- if(in != null)
+ if(invoke > 0)
{
+ Incoming in = null;
+
try
{
//
// Prepare the invocation.
//
+ boolean response = !_endpoint.datagram() && requestId != 0;
+ in = getIncoming(response);
BasicStream is = in.is();
stream.swap(is);
- BasicStream os = null;
+ BasicStream os = in.os();
try
{
//
// Prepare the response if necessary.
//
- if(!batch)
+ if(response)
{
- int requestId = is.readInt();
- if(!_endpoint.datagram() && requestId != 0) // 0 means oneway.
- {
- ++_responseCount;
- os = in.os();
- os.writeBlob(_replyHdr);
- os.writeInt(requestId);
- }
+ assert(invoke == 1);
+ os.writeBlob(_replyHdr);
+
+ //
+ // Fill in the request ID.
+ //
+ os.writeInt(requestId);
}
//
- // Do the invocation, or multiple invocations for
- // batch messages.
+ // Do the invocation, or multiple invocations for batch
+ // messages.
//
- do
+ while(invoke-- > 0)
{
- in.invoke(os != null);
+ in.invoke();
}
- while(batch && is.pos() < is.size());
}
catch(Ice.LocalException ex)
{
- reclaimIncoming(in); // Must be called outside of synchronization on _mutex.
+ reclaimIncoming(in); // Must be called outside the synchronization.
in = null;
- _mutex.lock();
- try
+ synchronized(this)
{
setState(StateClosed, ex);
- return;
- }
- finally
- {
- _mutex.unlock();
- }
- }
-
- //
- // Send a response if necessary.
- //
- if(os != null)
- {
- _mutex.lock();
-
- try
- {
- try
- {
- if(_state == StateClosed)
- {
- return;
- }
-
- //
- // Fill in the message size.
- //
- os.pos(3);
- final int sz = os.size();
- os.writeInt(sz);
-
- TraceUtil.traceReply("sending reply", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
-
- --_responseCount;
-
- if(_state == StateClosing && _responseCount == 0)
- {
- initiateShutdown();
- }
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- return;
- }
- }
- finally
- {
- _mutex.unlock();
}
}
}
@@ -674,52 +715,39 @@ public final class Connection extends EventHandler
{
if(in != null)
{
- reclaimIncoming(in); // Must be called outside of synchronization on _mutex.
+ reclaimIncoming(in); // Must be called outside the synchronization.
}
}
}
}
- public void
+ public synchronized void
finished(ThreadPool threadPool)
{
- _mutex.lock();
- try
- {
- threadPool.promoteFollower();
-
- if(_state == StateActive || _state == StateClosing)
- {
- registerWithPool();
- }
- else if(_state == StateClosed)
- {
- _transceiver.close();
- }
- }
- finally
- {
- _mutex.unlock();
- }
+ threadPool.promoteFollower();
+
+ if(_state == StateActive || _state == StateClosing)
+ {
+ registerWithPool();
+ }
+ else if(_state == StateClosed)
+ {
+ _transceiver.close();
+ _transceiver = null;
+ notifyAll();
+ }
}
- public void
+ public synchronized void
exception(Ice.LocalException ex)
{
- _mutex.lock();
- try
- {
- setState(StateClosed, ex);
- }
- finally
- {
- _mutex.unlock();
- }
+ setState(StateClosed, ex);
}
- public String
+ public synchronized String
toString()
{
+ assert(_transceiver != null);
return _transceiver.toString();
}
@@ -731,21 +759,25 @@ public final class Connection extends EventHandler
_adapter = adapter;
_logger = instance.logger();
_traceLevels = instance.traceLevels();
+ _registeredWithPool = false;
_warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
_nextRequestId = 1;
_batchStream = new BasicStream(instance);
- _responseCount = 0;
- _usageCount = 0;
+ _batchStreamInUse = false;
+ _batchRequestNum = 0;
+ _dispatchCount = 0;
+ _proxyCount = 0;
_state = StateHolding;
- _registeredWithPool = false;
}
protected void
finalize()
throws Throwable
{
- assert(_usageCount == 0);
assert(_state == StateClosed);
+ assert(_transceiver == null);
+ assert(_dispatchCount == 0);
+ assert(_proxyCount == 0);
assert(_incomingCache == null);
//
@@ -757,39 +789,6 @@ public final class Connection extends EventHandler
super.finalize();
}
- // DestructionReason
- public final static int ObjectAdapterDeactivated = 0;
- public final static int CommunicatorDestroyed = 1;
-
- public void
- destroy(int reason)
- {
- _mutex.lock();
- try
- {
- _batchStream.destroy();
-
- switch(reason)
- {
- case ObjectAdapterDeactivated:
- {
- setState(StateClosing, new Ice.ObjectAdapterDeactivatedException());
- break;
- }
-
- case CommunicatorDestroyed:
- {
- setState(StateClosing, new Ice.CommunicatorDestroyedException());
- break;
- }
- }
- }
- finally
- {
- _mutex.unlock();
- }
- }
-
private static final int StateActive = 0;
private static final int StateHolding = 1;
private static final int StateClosing = 2;
@@ -902,13 +901,15 @@ public final class Connection extends EventHandler
}
unregisterWithPool();
destroyIncomingCache();
+ _dispatchCount = 0;
break;
}
}
_state = state;
+ notifyAll();
- if(_state == StateClosing && _responseCount == 0)
+ if(_state == StateClosing && _dispatchCount == 0)
{
try
{
@@ -925,7 +926,7 @@ public final class Connection extends EventHandler
initiateShutdown()
{
assert(_state == StateClosing);
- assert(_responseCount == 0);
+ assert(_dispatchCount == 0);
if(!_endpoint.datagram())
{
@@ -1013,7 +1014,7 @@ public final class Connection extends EventHandler
}
private Incoming
- getIncoming()
+ getIncoming(boolean response)
{
Incoming in = null;
@@ -1021,14 +1022,14 @@ public final class Connection extends EventHandler
{
if(_incomingCache == null)
{
- in = new Incoming(_instance, _adapter);
+ in = new Incoming(_instance, _adapter, this, response);
}
else
{
in = _incomingCache;
_incomingCache = _incomingCache.next;
in.next = null;
- in.reset(_adapter);
+ in.reset(_adapter, this, response);
}
}
@@ -1038,8 +1039,6 @@ public final class Connection extends EventHandler
private void
reclaimIncoming(Incoming in)
{
- in.finished();
-
synchronized(_incomingCacheMutex)
{
in.next = _incomingCache;
@@ -1065,23 +1064,34 @@ public final class Connection extends EventHandler
}
}
- private final Transceiver _transceiver;
+ private Transceiver _transceiver;
private final Endpoint _endpoint;
+
private Ice.ObjectAdapter _adapter;
+
private final Ice.Logger _logger;
private final TraceLevels _traceLevels;
+
private ThreadPool _clientThreadPool;
private ThreadPool _serverThreadPool;
+ private boolean _registeredWithPool;
+
private final boolean _warn;
+
private int _nextRequestId;
private IntMap _requests = new IntMap();
+
private Ice.LocalException _exception;
+
private BasicStream _batchStream;
- private int _responseCount;
- private int _usageCount;
+ private boolean _batchStreamInUse;
+ private int _batchRequestNum;
+
+ private int _dispatchCount; // The number of requests currently being dispatched.
+ private int _proxyCount; // The number of proxies using this connection.
+
private int _state;
- private boolean _registeredWithPool;
- private RecursiveMutex _mutex = new RecursiveMutex();
+
private Incoming _incomingCache;
private java.lang.Object _incomingCacheMutex = new java.lang.Object();
}
diff --git a/java/src/IceInternal/Direct.java b/java/src/IceInternal/Direct.java
index 031bf58bc94..05efb96f406 100644
--- a/java/src/IceInternal/Direct.java
+++ b/java/src/IceInternal/Direct.java
@@ -21,10 +21,10 @@ public final class Direct
{
_current = current;
+ ((Ice.ObjectAdapterI)(_current.adapter)).incDirectCount();
+
try
{
- ((Ice.ObjectAdapterI)(_current.adapter)).incUsageCount();
-
_servant = _current.adapter.identityToServant(_current.id);
if(_servant == null && _current.id.category.length() > 0)
@@ -32,7 +32,7 @@ public final class Direct
_locator = _current.adapter.findServantLocator(_current.id.category);
if(_locator != null)
{
- _cookie = new Ice.LocalObjectHolder(); // Lazy creation
+ _cookie = new Ice.LocalObjectHolder(); // Lazy creation.
_servant = _locator.locate(_current, _cookie);
}
}
@@ -42,7 +42,7 @@ public final class Direct
_locator = _current.adapter.findServantLocator("");
if(_locator != null)
{
- _cookie = new Ice.LocalObjectHolder(); // Lazy creation
+ _cookie = new Ice.LocalObjectHolder(); // Lazy creation.
_servant = _locator.locate(_current, _cookie);
}
}
@@ -81,7 +81,7 @@ public final class Direct
}
finally
{
- ((Ice.ObjectAdapterI)(_current.adapter)).decUsageCount();
+ ((Ice.ObjectAdapterI)(_current.adapter)).decDirectCount();
}
}
}
@@ -98,7 +98,7 @@ public final class Direct
}
finally
{
- ((Ice.ObjectAdapterI)(_current.adapter)).decUsageCount();
+ ((Ice.ObjectAdapterI)(_current.adapter)).decDirectCount();
}
}
@@ -115,7 +115,7 @@ public final class Direct
}
}
- private Ice.Current _current;
+ private final Ice.Current _current;
private Ice.Object _servant;
private Ice.Object _facetServant;
private Ice.ServantLocator _locator;
diff --git a/java/src/IceInternal/Incoming.java b/java/src/IceInternal/Incoming.java
index 4694b464b44..c7857ba4bbc 100644
--- a/java/src/IceInternal/Incoming.java
+++ b/java/src/IceInternal/Incoming.java
@@ -17,33 +17,16 @@ package IceInternal;
public class Incoming
{
public
- Incoming(Instance instance, Ice.ObjectAdapter adapter)
+ Incoming(Instance instance, Ice.ObjectAdapter adapter, Connection connection, boolean response)
{
- _is = new BasicStream(instance);
- _os = new BasicStream(instance);
_current = new Ice.Current();
- _current.adapter = adapter;
_current.id = new Ice.Identity();
+ _current.adapter = adapter;
_cookie = new Ice.LocalObjectHolder();
-
- if(_current.adapter != null)
- {
- ((Ice.ObjectAdapterI)(_current.adapter)).incUsageCount();
- }
- }
-
- //
- // Must be called immediately after this object is no longer
- // needed, in order to update the object adapter usage count.
- //
- public void
- finished()
- {
- if(_current.adapter != null)
- {
- ((Ice.ObjectAdapterI)(_current.adapter)).decUsageCount();
- _current.adapter = null;
- }
+ _connection = connection;
+ _response = response;
+ _is = new BasicStream(instance);
+ _os = new BasicStream(instance);
}
//
@@ -51,23 +34,20 @@ public class Incoming
// reallocated.
//
public void
- reset(Ice.ObjectAdapter adapter)
+ reset(Ice.ObjectAdapter adapter, Connection connection, boolean response)
{
- _is.reset();
- _os.reset();
+ _current.adapter = adapter;
if(_current.ctx != null)
{
_current.ctx.clear();
}
-
- //assert(_current.adapter == null); // finished() should have been called
-
- _current.adapter = adapter;
-
- if(_current.adapter != null)
- {
- ((Ice.ObjectAdapterI)(_current.adapter)).incUsageCount();
- }
+ _servant = null;
+ _locator = null;
+ _cookie.value = null;
+ _connection = connection;
+ _response = response;
+ _is.reset();
+ _os.reset();
}
//
@@ -81,7 +61,7 @@ public class Incoming
}
public void
- invoke(boolean response)
+ invoke()
{
//
// Read the current.
@@ -104,16 +84,13 @@ public class Incoming
_is.startReadEncaps();
- if(response)
+ if(_response)
{
assert(_os.size() == Protocol.headerSize + 4); // Dispatch status position.
_os.writeByte((byte)0);
_os.startWriteEncaps();
}
- Ice.Object servant = null;
- Ice.ServantLocator locator = null;
- _cookie.value = null;
DispatchStatus status;
//
@@ -126,28 +103,28 @@ public class Incoming
{
if(_current.adapter != null)
{
- servant = _current.adapter.identityToServant(_current.id);
+ _servant = _current.adapter.identityToServant(_current.id);
- if(servant == null && _current.id.category.length() > 0)
+ if(_servant == null && _current.id.category.length() > 0)
{
- locator = _current.adapter.findServantLocator(_current.id.category);
- if(locator != null)
+ _locator = _current.adapter.findServantLocator(_current.id.category);
+ if(_locator != null)
{
- servant = locator.locate(_current, _cookie);
+ _servant = _locator.locate(_current, _cookie);
}
}
- if(servant == null)
+ if(_servant == null)
{
- locator = _current.adapter.findServantLocator("");
- if(locator != null)
+ _locator = _current.adapter.findServantLocator("");
+ if(_locator != null)
{
- servant = locator.locate(_current, _cookie);
+ _servant = _locator.locate(_current, _cookie);
}
}
}
- if(servant == null)
+ if(_servant == null)
{
status = DispatchStatus.DispatchObjectNotExist;
}
@@ -155,7 +132,7 @@ public class Incoming
{
if(_current.facet.length > 0)
{
- Ice.Object facetServant = servant.ice_findFacetPath(_current.facet, 0);
+ Ice.Object facetServant = _servant.ice_findFacetPath(_current.facet, 0);
if(facetServant == null)
{
status = DispatchStatus.DispatchFacetNotExist;
@@ -167,19 +144,12 @@ public class Incoming
}
else
{
- status = servant.__dispatch(this, _current);
+ status = _servant.__dispatch(this, _current);
}
}
}
catch(Ice.RequestFailedException ex)
{
- if(locator != null && servant != null)
- {
- locator.finished(_current, servant, _cookie.value);
- }
-
- _is.endReadEncaps();
-
if(ex.id == null)
{
ex.id = _current.id;
@@ -195,7 +165,9 @@ public class Incoming
ex.operation = _current.operation;
}
- if(response)
+ warning(ex);
+
+ if(_response)
{
_os.endWriteEncaps();
_os.resize(Protocol.headerSize + 4, false); // Dispatch status position.
@@ -220,19 +192,14 @@ public class Incoming
_os.writeString(ex.operation);
}
- warning(ex);
+ finishInvoke();
return;
}
catch(Ice.LocalException ex)
{
- if(locator != null && servant != null)
- {
- locator.finished(_current, servant, _cookie.value);
- }
-
- _is.endReadEncaps();
+ warning(ex);
- if(response)
+ if(_response)
{
_os.endWriteEncaps();
_os.resize(Protocol.headerSize + 4, false); // Dispatch status position.
@@ -240,7 +207,7 @@ public class Incoming
_os.writeString(ex.toString());
}
- warning(ex);
+ finishInvoke();
return;
}
/* Not possible in Java - UserExceptions are checked exceptions
@@ -251,14 +218,9 @@ public class Incoming
*/
catch(RuntimeException ex)
{
- if(locator != null && servant != null)
- {
- locator.finished(_current, servant, _cookie.value);
- }
-
- _is.endReadEncaps();
+ warning(ex);
- if(response)
+ if(_response)
{
_os.endWriteEncaps();
_os.resize(Protocol.headerSize + 4, false); // Dispatch status position.
@@ -266,18 +228,17 @@ public class Incoming
_os.writeString(ex.toString());
}
- warning(ex);
+ finishInvoke();
return;
}
- if(locator != null && servant != null)
- {
- locator.finished(_current, servant, _cookie.value);
- }
-
- _is.endReadEncaps();
+ //
+ // Don't put the code below into the try block above. Exceptions
+ // in the code below are considered fatal, and must propagate to
+ // the caller of this operation.
+ //
- if(response)
+ if(_response)
{
_os.endWriteEncaps();
@@ -302,6 +263,8 @@ public class Incoming
_os.pos(save);
}
}
+
+ finishInvoke();
}
public BasicStream
@@ -317,6 +280,31 @@ public class Incoming
}
private void
+ finishInvoke()
+ {
+ if(_locator != null && _servant != null)
+ {
+ _locator.finished(_current, _servant, _cookie.value);
+ }
+
+ _is.endReadEncaps();
+
+ //
+ // Send a response if necessary. If we don't need to send a
+ // response, we still need to tell the connection that we're
+ // finished with dispatching.
+ //
+ if(_response)
+ {
+ _connection.sendResponse(_os);
+ }
+ else
+ {
+ _connection.sendNoResponse();
+ }
+ }
+
+ private void
warning(Exception ex)
{
if(_os.instance().properties().getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 0)
@@ -356,10 +344,16 @@ public class Incoming
}
}
- private BasicStream _is;
- private BasicStream _os;
private Ice.Current _current;
+ private Ice.Object _servant;
+ private Ice.ServantLocator _locator;
private Ice.LocalObjectHolder _cookie;
- Incoming next; // For use by Connection
+ private Connection _connection;
+ private boolean _response;
+
+ private BasicStream _is;
+ private BasicStream _os;
+
+ Incoming next; // For use by Connection.
}
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index 5bb7816ef96..ce96f78d123 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -17,15 +17,84 @@ package IceInternal;
public class IncomingConnectionFactory extends EventHandler
{
public synchronized void
+ activate()
+ {
+ setState(StateActive);
+ }
+
+ public synchronized void
hold()
{
setState(StateHolding);
}
public synchronized void
- activate()
+ destroy()
{
- setState(StateActive);
+ setState(StateClosed);
+ }
+
+ public synchronized void
+ waitUntilHolding()
+ {
+ //
+ // First we wait until the connection factory itself is in
+ // holding state.
+ //
+ while(_state < StateHolding)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ //
+ // Now we wait until each connection is in holding state.
+ //
+ java.util.ListIterator iter = _connections.listIterator();
+ while(iter.hasNext())
+ {
+ Connection connection = (Connection)iter.next();
+ connection.waitUntilHolding();
+ }
+ }
+
+ public synchronized void
+ waitUntilFinished()
+ {
+ //
+ // First we wait until the factory is destroyed.
+ //
+ while(_acceptor != null)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ //
+ // Now we wait for until the destruction of each connection is
+ // finished.
+ //
+ java.util.ListIterator iter = _connections.listIterator();
+ while(iter.hasNext())
+ {
+ Connection connection = (Connection)iter.next();
+ connection.waitUntilFinished();
+ }
+
+ //
+ // We're done, now we can throw away all connections.
+ //
+ _connections.clear();
}
public Endpoint
@@ -50,21 +119,23 @@ public class IncomingConnectionFactory extends EventHandler
public synchronized Connection[]
connections()
{
- //
- // Reap destroyed connections.
- //
+ java.util.LinkedList connections = new java.util.LinkedList();
+
+ //
+ // Only copy connections which have not been destroyed.
+ //
java.util.ListIterator iter = _connections.listIterator();
while(iter.hasNext())
{
Connection connection = (Connection)iter.next();
- if(connection.destroyed())
+ if(!connection.isDestroyed())
{
- iter.remove();
+ connections.add(connection);
}
}
- Connection[] arr = new Connection[_connections.size()];
- _connections.toArray(arr);
+ Connection[] arr = new Connection[connections.size()];
+ connections.toArray(arr);
return arr;
}
@@ -93,14 +164,14 @@ public class IncomingConnectionFactory extends EventHandler
return;
}
- //
- // Reap destroyed connections.
- //
+ //
+ // Reap connections for which destruction has completed.
+ //
java.util.ListIterator iter = _connections.listIterator();
while(iter.hasNext())
{
Connection connection = (Connection)iter.next();
- if(connection.destroyed())
+ if(connection.isFinished())
{
iter.remove();
}
@@ -169,20 +240,16 @@ public class IncomingConnectionFactory extends EventHandler
{
threadPool.promoteFollower();
- if(_state == StateActive)
- {
- registerWithPool();
- }
- else if(_state == StateClosed)
- {
- _acceptor.close();
-
- //
- // We don't need the adapter anymore after we closed the
- // acceptor.
- //
- _adapter = null;
- }
+ if(_state == StateActive)
+ {
+ registerWithPool();
+ }
+ else if(_state == StateClosed)
+ {
+ _acceptor.close();
+ _acceptor = null;
+ notifyAll();
+ }
}
public void
@@ -191,7 +258,8 @@ public class IncomingConnectionFactory extends EventHandler
assert(false); // Must not be called.
}
- public String toString()
+ public synchronized String
+ toString()
{
if(_transceiver != null)
{
@@ -208,9 +276,9 @@ public class IncomingConnectionFactory extends EventHandler
super(instance);
_endpoint = endpoint;
_adapter = adapter;
+ _registeredWithPool = false;
_warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
_state = StateHolding;
- _registeredWithPool = false;
DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
if(defaultsAndOverrides.overrideTimeout)
@@ -229,12 +297,6 @@ public class IncomingConnectionFactory extends EventHandler
Connection connection = new Connection(_instance, _transceiver, _endpoint, _adapter);
connection.validate();
_connections.add(connection);
-
- //
- // We don't need an adapter anymore if we don't use an
- // acceptor.
- //
- _adapter = null;
}
else
{
@@ -256,8 +318,9 @@ public class IncomingConnectionFactory extends EventHandler
finalize()
throws Throwable
{
- assert(_state == StateClosed);
- assert(_adapter == null);
+ assert(_state == StateClosed);
+ assert(_acceptor == null);
+ assert(_connections.size() == 0);
//
// Destroy the EventHandler's stream, so that its buffer
@@ -268,12 +331,6 @@ public class IncomingConnectionFactory extends EventHandler
super.finalize();
}
- public synchronized void
- destroy()
- {
- setState(StateClosed);
- }
-
private static final int StateActive = 0;
private static final int StateHolding = 1;
private static final int StateClosed = 2;
@@ -307,7 +364,7 @@ public class IncomingConnectionFactory extends EventHandler
case StateHolding:
{
- if(_state != StateActive) // Can only switch from active to holding
+ if(_state != StateActive) // Can only switch from active to holding.
{
return;
}
@@ -340,13 +397,12 @@ public class IncomingConnectionFactory extends EventHandler
Connection connection = (Connection)iter.next();
connection.destroy(Connection.ObjectAdapterDeactivated);
}
- _connections.clear();
-
- break;
+ break;
}
}
_state = state;
+ notifyAll();
}
private void
@@ -392,13 +448,18 @@ public class IncomingConnectionFactory extends EventHandler
_instance.logger().warning(s);
}
- private Endpoint _endpoint;
- private Ice.ObjectAdapter _adapter; // Cannot be final, because it must be set to null to break cyclic dependency.
private Acceptor _acceptor;
private final Transceiver _transceiver;
+ private Endpoint _endpoint;
+
+ private final Ice.ObjectAdapter _adapter;
+
private ThreadPool _serverThreadPool;
+ private boolean _registeredWithPool;
+
private final boolean _warn;
+
private java.util.LinkedList _connections = new java.util.LinkedList();
+
private int _state;
- private boolean _registeredWithPool;
}
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index 10c0b4bd3e2..3303a6da684 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -265,113 +265,59 @@ public class Instance
public void
destroy()
{
- ThreadPool clientThreadPool;
- ThreadPool serverThreadPool;
-
+ assert(!_destroyed);
+
+ _objectAdapterFactory.shutdown();
+ _objectAdapterFactory.waitForShutdown();
+
+ _outgoingConnectionFactory.destroy();
+ _outgoingConnectionFactory.waitUntilFinished();
+
synchronized(this)
{
- if(_destroyed)
- {
- return; // Don't destroy twice.
- }
+ _objectAdapterFactory = null;
+ _outgoingConnectionFactory = null;
- _destroyed = true;
-
- if(_objectAdapterFactory != null)
+ if(_serverThreadPool != null)
{
- // Don't shut down the object adapters -- the communicator
- // must do this before it destroys this object.
- _objectAdapterFactory = null;
+ _serverThreadPool.destroy();
+ _serverThreadPool.joinWithAllThreads();
+ _serverThreadPool = null;
}
- if(_outgoingConnectionFactory != null)
+ if(_clientThreadPool != null)
{
- _outgoingConnectionFactory.destroy();
- _outgoingConnectionFactory = null;
- }
-
- //
- // We destroy the thread pool outside the thread
- // synchronization.
- //
- clientThreadPool = _clientThreadPool;
- _clientThreadPool = null;
- serverThreadPool = _serverThreadPool;
- _serverThreadPool = null;
- }
-
- //
- // We must destroy the outgoing connection factory before we
- // destroy the client thread pool.
- //
- if(clientThreadPool != null)
- {
- clientThreadPool.waitUntilFinished();
- clientThreadPool.destroy();
- clientThreadPool.joinWithAllThreads();
- }
-
- //
- // We must destroy the object adapter factory before we destroy
- // the server thread pool.
- //
- if(serverThreadPool != null)
- {
- serverThreadPool.waitUntilFinished();
- serverThreadPool.destroy();
- serverThreadPool.joinWithAllThreads();
- }
-
- synchronized(this)
- {
- if(_servantFactoryManager != null)
- {
- _servantFactoryManager.destroy();
- _servantFactoryManager = null;
+ _clientThreadPool.destroy();
+ _clientThreadPool.joinWithAllThreads();
+ _clientThreadPool = null;
}
- if(_userExceptionFactoryManager != null)
- {
- _userExceptionFactoryManager.destroy();
- _userExceptionFactoryManager = null;
- }
+ _servantFactoryManager.destroy();
+ _servantFactoryManager = null;
- if(_referenceFactory != null)
- {
- _referenceFactory.destroy();
- _referenceFactory = null;
- }
+ _userExceptionFactoryManager.destroy();
+ _userExceptionFactoryManager = null;
- if(_proxyFactory != null)
- {
- // No destroy function defined
- // _proxyFactory.destroy();
- _proxyFactory = null;
- }
-
- if(_routerManager != null)
- {
- _routerManager.destroy();
- _routerManager = null;
- }
-
- if(_locatorManager != null)
- {
- _locatorManager.destroy();
- _locatorManager = null;
- }
-
- if(_endpointFactoryManager != null)
- {
- _endpointFactoryManager.destroy();
- _endpointFactoryManager = null;
- }
-
- if(_pluginManager != null)
- {
- _pluginManager.destroy();
- _pluginManager = null;
- }
+ _referenceFactory.destroy();
+ _referenceFactory = null;
+
+ // No destroy function defined.
+ // _proxyFactory.destroy();
+ _proxyFactory = null;
+
+ _routerManager.destroy();
+ _routerManager = null;
+
+ _locatorManager.destroy();
+ _locatorManager = null;
+
+ _endpointFactoryManager.destroy();
+ _endpointFactoryManager = null;
+
+ _pluginManager.destroy();
+ _pluginManager = null;
+
+ _destroyed = true;
}
}
diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java
index bec9d78f9cb..518d16932bd 100644
--- a/java/src/IceInternal/ObjectAdapterFactory.java
+++ b/java/src/IceInternal/ObjectAdapterFactory.java
@@ -19,6 +19,15 @@ public final class ObjectAdapterFactory
public synchronized void
shutdown()
{
+ //
+ // Ignore shutdown requests if the object adapter factory has
+ // already been shut down.
+ //
+ if(_instance == null)
+ {
+ return;
+ }
+
java.util.Iterator i = _adapters.values().iterator();
while(i.hasNext())
{
@@ -26,12 +35,53 @@ public final class ObjectAdapterFactory
adapter.deactivate();
}
- _adapters.clear();
+ _instance = null;
+ _communicator = null;
+
+ notifyAll();
}
+ public synchronized void
+ waitForShutdown()
+ {
+ //
+ // First we wait for the shutdown of the factory itself.
+ //
+ while(_instance != null)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ //
+ // Now we wait for deactivation of each object adapter.
+ //
+ java.util.Iterator i = _adapters.values().iterator();
+ while(i.hasNext())
+ {
+ Ice.ObjectAdapter adapter = (Ice.ObjectAdapter)i.next();
+ adapter.waitForDeactivate();
+ }
+
+ //
+ // We're done, now we can throw away the object adapters.
+ //
+ _adapters.clear();
+ }
+
public synchronized Ice.ObjectAdapter
createObjectAdapter(String name, String endpts, String id)
{
+ if(_instance == null)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
Ice.ObjectAdapter adapter = (Ice.ObjectAdapter)_adapters.get(name);
if(adapter != null)
{
@@ -46,21 +96,33 @@ public final class ObjectAdapterFactory
public synchronized Ice.ObjectAdapter
findObjectAdapter(Ice.ObjectPrx proxy)
{
+ if(_instance == null)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
java.util.Iterator i = _adapters.values().iterator();
while(i.hasNext())
{
Ice.ObjectAdapterI adapter = (Ice.ObjectAdapterI)i.next();
- if(adapter.isLocal(proxy))
- {
- return adapter;
- }
- }
+ try
+ {
+ if(adapter.isLocal(proxy))
+ {
+ return adapter;
+ }
+ }
+ catch(Ice.ObjectAdapterDeactivatedException ex)
+ {
+ // Ignore.
+ }
+ }
return null;
}
//
- // Only for use by Instance
+ // Only for use by Instance.
//
ObjectAdapterFactory(Instance instance, Ice.Communicator communicator)
{
@@ -68,6 +130,17 @@ public final class ObjectAdapterFactory
_communicator = communicator;
}
+ protected void
+ finalize()
+ throws Throwable
+ {
+ assert(_instance == null);
+ assert(_communicator == null);
+ assert(_adapters.size() == 0);
+
+ super.finalize();
+ }
+
private Instance _instance;
private Ice.Communicator _communicator;
private java.util.HashMap _adapters = new java.util.HashMap();
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 09f0f317c29..2c4b5d0e6c3 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -16,6 +16,58 @@ package IceInternal;
public class OutgoingConnectionFactory
{
+ public synchronized void
+ destroy()
+ {
+ if(_instance == null)
+ {
+ return;
+ }
+
+ java.util.Iterator p = _connections.values().iterator();
+ while(p.hasNext())
+ {
+ Connection connection = (Connection)p.next();
+ connection.destroy(Connection.CommunicatorDestroyed);
+ }
+
+ _instance = null;
+ }
+
+ public synchronized void
+ waitUntilFinished()
+ {
+ //
+ // First we wait until the factory is destroyed.
+ //
+ while(_instance != null)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ //
+ // Now we wait for until the destruction of each connection is
+ // finished.
+ //
+ java.util.Iterator p = _connections.values().iterator();
+ while(p.hasNext())
+ {
+ Connection connection = (Connection)p.next();
+ connection.waitUntilFinished();
+ }
+
+ //
+ // We're done, now we can throw away all connections.
+ //
+ _connections.clear();
+ }
+
public synchronized Connection
create(Endpoint[] endpoints)
{
@@ -26,14 +78,14 @@ public class OutgoingConnectionFactory
assert(endpoints.length > 0);
- //
- // Reap destroyed connections.
- //
+ //
+ // Reap connections for which destruction has completed.
+ //
java.util.Iterator p = _connections.values().iterator();
while(p.hasNext())
{
Connection connection = (Connection)p.next();
- if(connection.destroyed())
+ if(connection.isFinished())
{
p.remove();
}
@@ -54,12 +106,19 @@ public class OutgoingConnectionFactory
Connection connection = (Connection)_connections.get(endpoint);
if(connection != null)
{
- return connection;
+ //
+ // Don't return connections for which destruction has
+ // been initiated.
+ //
+ if(!connection.isDestroyed())
+ {
+ return connection;
+ }
}
}
//
- // No connections exist, try to create one
+ // No connections exist, try to create one.
//
TraceLevels traceLevels = _instance.traceLevels();
Ice.Logger logger = _instance.logger();
@@ -195,24 +254,6 @@ public class OutgoingConnectionFactory
super.finalize();
}
- public synchronized void
- destroy()
- {
- if(_instance == null)
- {
- return;
- }
-
- java.util.Iterator p = _connections.values().iterator();
- while(p.hasNext())
- {
- Connection connection = (Connection)p.next();
- connection.destroy(Connection.CommunicatorDestroyed);
- }
- _connections.clear();
- _instance = null;
- }
-
private Instance _instance;
private java.util.HashMap _connections = new java.util.HashMap();
}
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 33b15884b42..4576880f122 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -29,11 +29,8 @@ public final class ThreadPool
{
if(TRACE_REGISTRATION)
{
- trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd +
- ", handler count = " + (_handlers + 1));
+ trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd);
}
-
- ++_handlers;
_changes.add(new FdHandlerPair(fd, handler));
setInterrupt(0);
}
@@ -88,41 +85,6 @@ public final class ThreadPool
setInterrupt(1);
}
- public synchronized void
- waitUntilFinished()
- {
- if(TRACE_SHUTDOWN)
- {
- trace("waiting until finished...");
- }
-
- while(_handlers != 0 && _threadNum != 0)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
- if(_handlers != 0)
- {
- _instance.logger().error("can't wait for graceful application termination in thread pool\n" +
- "since all threads have vanished");
- }
- else
- {
- assert(_handlerMap.isEmpty());
- }
-
- if(TRACE_SHUTDOWN)
- {
- trace("finished.");
- }
- }
-
public void
joinWithAllThreads()
{
@@ -155,7 +117,6 @@ public final class ThreadPool
{
_instance = instance;
_destroyed = false;
- _handlers = 0;
_timeout = 0;
_multipleThreads = false;
_name = name;
@@ -183,23 +144,24 @@ public final class ThreadPool
//
_keys = _selector.selectedKeys();
+ int threadNum;
if(server)
{
_timeout = _instance.properties().getPropertyAsInt("Ice.ServerIdleTime");
_timeoutMillis = _timeout * 1000;
- _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10);
+ threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10);
}
else
{
- _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1);
+ threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 1);
}
- if(_threadNum < 1)
+ if(threadNum < 1)
{
- _threadNum = 1;
+ threadNum = 1;
}
- if(_threadNum > 1)
+ if(threadNum > 1)
{
_multipleThreads = true;
}
@@ -216,8 +178,8 @@ public final class ThreadPool
try
{
- _threads = new EventHandlerThread[_threadNum];
- for(int i = 0; i < _threadNum; i++)
+ _threads = new EventHandlerThread[threadNum];
+ for(int i = 0; i < threadNum; i++)
{
_threads[i] = new EventHandlerThread(threadNamePrefix + _name + "-" + i);
_threads[i].start();
@@ -387,8 +349,6 @@ public final class ThreadPool
private void
run(BasicStream stream)
{
- boolean shutdown = false;
-
while(true)
{
if(_multipleThreads)
@@ -405,21 +365,6 @@ public final class ThreadPool
while(true)
{
- if(shutdown) // Shutdown has been initiated.
- {
- if(TRACE_SHUTDOWN)
- {
- trace("shutdown detected");
- }
-
- shutdown = false;
- ObjectAdapterFactory factory = _instance.objectAdapterFactory();
- if(factory != null)
- {
- factory.shutdown();
- }
- }
-
if(TRACE_REGISTRATION)
{
java.util.Set keys = _selector.keys();
@@ -433,7 +378,7 @@ public final class ThreadPool
}
select();
- if(_keys.size() == 0) // Timeout.
+ if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout.
{
if(TRACE_SELECT)
{
@@ -443,12 +388,13 @@ public final class ThreadPool
assert(_timeout > 0);
_timeout = 0;
_timeoutMillis = 0;
- shutdown = true;
+ initiateShutdown();
continue repeatSelect;
}
EventHandler handler = null;
boolean finished = false;
+ boolean shutdown = false;
synchronized(this)
{
@@ -493,71 +439,68 @@ public final class ThreadPool
shutdown = clearInterrupt();
- //
- // Server shutdown?
- //
- if(shutdown)
+ if(!shutdown)
{
- continue repeatSelect;
- }
-
- //
- // An event handler must have been registered or
- // unregistered.
- //
- assert(!_changes.isEmpty());
- FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
-
- if(change.handler != null) // Addition if handler is set.
- {
- int op;
- if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
- {
- op = java.nio.channels.SelectionKey.OP_READ;
- }
- else
- {
- op = java.nio.channels.SelectionKey.OP_ACCEPT;
- }
-
- java.nio.channels.SelectionKey key = null;
- try
- {
- key = change.fd.register(_selector, op, change.handler);
- }
- catch(java.nio.channels.ClosedChannelException ex)
- {
- assert(false);
- }
- _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
-
- if(TRACE_REGISTRATION)
- {
- trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
- change.fd);
- }
-
- continue repeatSelect;
- }
- else // Removal if handler is not set.
- {
- HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
- assert(pair != null);
- handler = pair.handler;
- finished = true;
- pair.key.cancel();
-
- if(TRACE_REGISTRATION)
- {
- trace("removed handler (" + handler.getClass().getName() + ") for fd " + change.fd);
- }
-
- // Don't goto repeatSelect; we have to call
- // finished() on the event handler below, outside
- // the thread synchronization.
- }
- }
- else
+ //
+ // An event handler must have been
+ // registered or unregistered.
+ //
+ assert(!_changes.isEmpty());
+ FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
+
+ if(change.handler != null) // Addition if handler is set.
+ {
+ int op;
+ if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
+ {
+ op = java.nio.channels.SelectionKey.OP_READ;
+ }
+ else
+ {
+ op = java.nio.channels.SelectionKey.OP_ACCEPT;
+ }
+
+ java.nio.channels.SelectionKey key = null;
+ try
+ {
+ key = change.fd.register(_selector, op, change.handler);
+ }
+ catch(java.nio.channels.ClosedChannelException ex)
+ {
+ assert(false);
+ }
+ _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
+ change.fd);
+ }
+
+ continue repeatSelect;
+ }
+ else // Removal if handler is not set.
+ {
+ HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
+ assert(pair != null);
+ handler = pair.handler;
+ finished = true;
+ pair.key.cancel();
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("removed handler (" + handler.getClass().getName() + ") for fd " +
+ change.fd);
+ }
+
+ // Don't goto repeatSelect; we have to
+ // call finished() on the event
+ // handler below, outside the thread
+ // synchronization.
+ }
+ }
+ }
+ else
{
java.nio.channels.SelectionKey key = null;
java.util.Iterator iter = _keys.iterator();
@@ -594,80 +537,92 @@ public final class ThreadPool
}
}
- assert(handler != null);
+ assert(handler != null || shutdown);
- if(finished)
+ if(shutdown) // Shutdown has been initiated.
{
- //
- // Notify a handler about it's removal from the thread
- // pool.
- //
- try
- {
- handler.finished(this);
- }
- catch(Ice.LocalException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception while calling finished():\n" + sw.toString() + "\n" + handler.toString();
- _instance.logger().error(s);
- }
-
- synchronized(this)
+ if(TRACE_SHUTDOWN)
{
- assert(_handlers > 0);
- if(--_handlers == 0)
- {
- notifyAll(); // For waitUntilFinished().
- }
+ trace("shutdown detected");
}
- }
- else
- {
- //
- // If the handler is "readable", try to read a message.
- //
- try
- {
- if(handler.readable())
- {
- try
- {
- read(handler);
- }
- catch(Ice.TimeoutException ex) // Expected
- {
- continue repeatSelect;
- }
- catch(Ice.LocalException ex)
- {
- if(TRACE_EXCEPTION)
- {
- trace("informing handler (" + handler.getClass().getName() +
- ") about exception " + ex);
- ex.printStackTrace();
- }
-
- handler.exception(ex);
- continue repeatSelect;
- }
- stream.swap(handler._stream);
- assert(stream.pos() == stream.size());
- }
+ ObjectAdapterFactory factory = _instance.objectAdapterFactory();
+ if(factory == null)
+ {
+ continue repeatSelect;
+ }
- handler.message(stream, this);
- }
- finally
- {
- stream.reset();
- }
+ promoteFollower();
+ factory.shutdown();
}
-
- break; // inner while loop
+ else
+ {
+ if(finished)
+ {
+ //
+ // Notify a handler about it's removal from
+ // the thread pool.
+ //
+ try
+ {
+ handler.finished(this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "exception while calling finished():\n" + sw.toString() + "\n" +
+ handler.toString();
+ _instance.logger().error(s);
+ }
+ }
+ else
+ {
+ //
+ // If the handler is "readable", try to read a
+ // message.
+ //
+ try
+ {
+ if(handler.readable())
+ {
+ try
+ {
+ read(handler);
+ }
+ catch(Ice.TimeoutException ex) // Expected.
+ {
+ continue repeatSelect;
+ }
+ catch(Ice.LocalException ex)
+ {
+ if(TRACE_EXCEPTION)
+ {
+ trace("informing handler (" + handler.getClass().getName() +
+ ") about exception " + ex);
+ ex.printStackTrace();
+ }
+
+ handler.exception(ex);
+ continue repeatSelect;
+ }
+
+ stream.swap(handler._stream);
+ assert(stream.pos() == stream.size());
+ }
+
+ handler.message(stream, this);
+ }
+ finally
+ {
+ stream.reset();
+ }
+ }
+ }
+
+ break; // Inner while loop.
}
}
}
@@ -890,7 +845,6 @@ public final class ThreadPool
private java.util.Set _keys;
private java.util.LinkedList _changes = new java.util.LinkedList();
private java.util.HashMap _handlerMap = new java.util.HashMap();
- private int _handlers;
private int _timeout;
private int _timeoutMillis;
private RecursiveMutex _threadMutex = new RecursiveMutex();
@@ -932,25 +886,6 @@ public final class ThreadPool
_instance.logger().error(s);
}
- synchronized(ThreadPool.this)
- {
- --_threadNum;
- assert(_threadNum >= 0);
-
- //
- // The notifyAll() shouldn't be needed, *except* if one of the
- // threads exits because of an exception. (Which is an error
- // condition in Ice and if it happens needs to be debugged.)
- // However, I call notifyAll() anyway, in all cases, using a
- // "defensive" programming approach when it comes to
- // multithreading.
- //
- if(_threadNum == 0)
- {
- ThreadPool.this.notifyAll(); // For waitUntil...Finished() methods.
- }
- }
-
if(TRACE_THREAD)
{
trace("run() terminated - promoting follower");
@@ -962,5 +897,4 @@ public final class ThreadPool
}
}
private EventHandlerThread[] _threads;
- private int _threadNum; // Number of running threads
}
diff --git a/java/src/IceInternal/TraceUtil.java b/java/src/IceInternal/TraceUtil.java
index 9b7b842fa7c..509d8cd324b 100644
--- a/java/src/IceInternal/TraceUtil.java
+++ b/java/src/IceInternal/TraceUtil.java
@@ -71,11 +71,12 @@ final class TraceUtil
s.write(heading);
printHeader(s, str);
- int cnt = 0;
- while(str.pos() != str.size())
+ int batchRequestNum = str.readInt();
+ s.write("\nnumber of requests = " + batchRequestNum);
+
+ for(int i = 0; i < batchRequestNum; ++i)
{
- s.write("\nrequest #" + cnt + ':');
- cnt++;
+ s.write("\nrequest #" + i + ':');
printRequestHeader(s, str);
str.skipEncaps();
}