summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2008-03-06 10:13:42 +0100
committerBenoit Foucher <benoit@zeroc.com>2008-03-06 10:13:42 +0100
commitc6dbd090d9691cc0116a2967b2827b858b184dfe (patch)
tree6d2ad80c98665c9090b16f97c400ab4b33c7ab73 /java/src
parentMerge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff)
downloadice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.bz2
ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.xz
ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.zip
Removed thread-per-connection and added serialize mode
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/AMI_Object_ice_flushBatchRequests.java5
-rw-r--r--java/src/Ice/AMI_Object_ice_invoke.java7
-rw-r--r--java/src/Ice/ConnectionI.java1193
-rw-r--r--java/src/Ice/ObjectAdapterI.java29
-rw-r--r--java/src/Ice/ObjectPrx.java11
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java32
-rwxr-xr-xjava/src/IceGridGUI/Coordinator.java2
-rw-r--r--java/src/IceInternal/Acceptor.java3
-rw-r--r--java/src/IceInternal/BatchOutgoing.java4
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java68
-rw-r--r--java/src/IceInternal/ConnectionRequestHandler.java10
-rw-r--r--java/src/IceInternal/Connector.java2
-rw-r--r--java/src/IceInternal/EventHandler.java12
-rw-r--r--java/src/IceInternal/FixedReference.java12
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java310
-rw-r--r--java/src/IceInternal/Instance.java34
-rw-r--r--java/src/IceInternal/Network.java138
-rw-r--r--java/src/IceInternal/Outgoing.java4
-rw-r--r--java/src/IceInternal/OutgoingAsync.java7
-rw-r--r--java/src/IceInternal/OutgoingAsyncMessageCallback.java46
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java60
-rw-r--r--java/src/IceInternal/PropertyNames.java116
-rw-r--r--java/src/IceInternal/Reference.java2
-rw-r--r--java/src/IceInternal/ReferenceFactory.java8
-rw-r--r--java/src/IceInternal/RequestHandler.java4
-rw-r--r--java/src/IceInternal/RoutableReference.java36
-rw-r--r--java/src/IceInternal/Selector.java479
-rw-r--r--java/src/IceInternal/SelectorHandler.java31
-rw-r--r--java/src/IceInternal/SelectorThread.java401
-rw-r--r--java/src/IceInternal/TcpAcceptor.java94
-rw-r--r--java/src/IceInternal/TcpConnector.java4
-rw-r--r--java/src/IceInternal/TcpTransceiver.java312
-rw-r--r--java/src/IceInternal/ThreadPool.java660
-rw-r--r--java/src/IceInternal/Transceiver.java20
-rw-r--r--java/src/IceInternal/UdpConnector.java2
-rw-r--r--java/src/IceInternal/UdpTransceiver.java182
-rw-r--r--java/src/IceInternal/UnknownEndpointI.java6
-rw-r--r--java/src/IceSSL/AcceptorI.java85
-rw-r--r--java/src/IceSSL/ConnectorI.java4
-rw-r--r--java/src/IceSSL/TransceiverI.java361
40 files changed, 1409 insertions, 3387 deletions
diff --git a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
index 3c95aed0cd3..9087b9e1c26 100644
--- a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
+++ b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
@@ -13,7 +13,7 @@ public abstract class AMI_Object_ice_flushBatchRequests extends IceInternal.Batc
{
public abstract void ice_exception(LocalException ex);
- public final void __invoke(Ice.ObjectPrx prx)
+ public final boolean __invoke(Ice.ObjectPrx prx)
{
__acquireCallback(prx);
try
@@ -28,7 +28,7 @@ public abstract class AMI_Object_ice_flushBatchRequests extends IceInternal.Batc
try
{
delegate = proxy.__getDelegate(true);
- delegate.__getRequestHandler().flushAsyncBatchRequests(this);
+ return delegate.__getRequestHandler().flushAsyncBatchRequests(this);
}
catch(Ice.LocalException ex)
{
@@ -39,5 +39,6 @@ public abstract class AMI_Object_ice_flushBatchRequests extends IceInternal.Batc
{
__releaseCallback(ex);
}
+ return false;
}
}
diff --git a/java/src/Ice/AMI_Object_ice_invoke.java b/java/src/Ice/AMI_Object_ice_invoke.java
index 46deda21576..dfcc40477f7 100644
--- a/java/src/Ice/AMI_Object_ice_invoke.java
+++ b/java/src/Ice/AMI_Object_ice_invoke.java
@@ -14,8 +14,8 @@ public abstract class AMI_Object_ice_invoke extends IceInternal.OutgoingAsync
public abstract void ice_response(boolean ok, byte[] outParams);
public abstract void ice_exception(LocalException ex);
- public final void __invoke(Ice.ObjectPrx prx, String operation, OperationMode mode,
- byte[] inParams, java.util.Map context)
+ public final boolean __invoke(Ice.ObjectPrx prx, String operation, OperationMode mode,
+ byte[] inParams, java.util.Map context)
{
__acquireCallback(prx);
try
@@ -23,11 +23,12 @@ public abstract class AMI_Object_ice_invoke extends IceInternal.OutgoingAsync
__prepare(prx, operation, mode, context);
__os.writeBlob(inParams);
__os.endWriteEncaps();
- __send();
+ return __send();
}
catch(LocalException ex)
{
__releaseCallback(ex);
+ return false;
}
}
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 3005bdff402..f19a8083852 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -9,8 +9,7 @@
package Ice;
-public final class ConnectionI extends IceInternal.EventHandler
- implements Connection, IceInternal.SelectorThread.SocketReadyCallback
+public final class ConnectionI extends IceInternal.EventHandler implements Connection
{
public interface StartCallback
{
@@ -18,23 +17,6 @@ public final class ConnectionI extends IceInternal.EventHandler
void connectionStartFailed(ConnectionI connection, Ice.LocalException ex);
}
- public class CallFinished implements IceInternal.ThreadPoolWorkItem
- {
- public
- CallFinished(ConnectionI connection)
- {
- _connection = connection;
- }
-
- public void
- execute(IceInternal.ThreadPool threadPool)
- {
- _connection.finished(threadPool);
- }
-
- final private ConnectionI _connection;
- }
-
public void
start(StartCallback callback)
{
@@ -42,96 +24,44 @@ public final class ConnectionI extends IceInternal.EventHandler
{
synchronized(this)
{
- _startCallback = callback;
-
- //
- // The connection might already be closed if the communicator was destroyed.
- //
- if(_state == StateClosed)
+ if(_state == StateClosed) // The connection might already be closed if the communicator was destroyed.
{
assert(_exception != null);
throw _exception;
}
- }
-
- if(_threadPerConnection)
- {
- //
- // In thread per connection mode, we create the thread for the connection. The
- // intialization and validation of the connection is taken care of by the thread
- // per connection.
- //
- try
- {
- _thread = new ThreadPerConnection();
- _thread.start();
- }
- catch(java.lang.Exception ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- _logger.error("cannot create thread for connection:\n" + sw.toString());
-
- //
- // Clean up.
- //
- _thread = null;
- Ice.SyscallException e = new Ice.SyscallException();
- e.initCause(ex);
- throw e;
- }
- }
- else
- {
- //
- // Initialize the connection transceiver and then validate the connection.
- //
- IceInternal.SocketStatus status = initialize(0);
+ IceInternal.SocketStatus status = initialize();
if(status == IceInternal.SocketStatus.Finished)
{
- status = validate(0);
+ status = validate();
}
- if(status == IceInternal.SocketStatus.Finished)
+ if(status != IceInternal.SocketStatus.Finished)
{
- finishStart(null);
- return; // We're done!
- }
-
- //
- // If the initialization or validation couldn't be completed without potentially
- // blocking, we register the connection with the selector thread and return.
- //
- int timeout;
- IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideConnectTimeout)
- {
- timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
- }
- else
- {
- timeout = _endpoint.timeout();
- }
-
- synchronized(this)
- {
- if(_state == StateClosed)
+ //
+ // If the initialization or validation couldn't be completed without potentially
+ // blocking, we register the connection with the selector thread and return.
+ //
+ int timeout;
+ IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideConnectTimeout)
{
- assert(_exception != null);
- throw _exception;
+ timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
}
+ else
+ {
+ timeout = _endpoint.timeout();
+ }
+
_sendInProgress = true;
- _selectorThread._register(_transceiver.fd(), this, status, timeout);
- }
- }
+ _selectorThread._register(_socketReadyCallback, status, timeout);
+
+ if(callback != null)
+ {
+ _startCallback = callback;
+ return;
+ }
- if(callback == null) // Wait for the connection to be validated.
- {
- synchronized(this)
- {
while(_state <= StateNotValidated)
{
try
@@ -153,16 +83,22 @@ public final class ConnectionI extends IceInternal.EventHandler
}
catch(Ice.LocalException ex)
{
- synchronized(this)
+ exception(ex);
+ if(callback != null)
{
- setState(StateClosed, ex);
- if(callback != null)
- {
- return;
- }
+ callback.connectionStartFailed(this, _exception);
+ return;
+ }
+ else
+ {
+ waitUntilFinished();
+ throw ex;
}
- waitUntilFinished();
- throw ex;
+ }
+
+ if(callback != null)
+ {
+ callback.connectionStartCompleted(this);
}
}
@@ -253,39 +189,15 @@ public final class ConnectionI extends IceInternal.EventHandler
return _state > StateNotValidated && _state < StateClosing;
}
- public boolean
+ public synchronized boolean
isFinished()
{
- Thread threadPerConnection = null;
-
- synchronized(this)
- {
- if(_transceiver != null || _dispatchCount != 0 || (_thread != null && _thread.isAlive()))
- {
- return false;
- }
-
- assert(_state == StateClosed);
-
- threadPerConnection = _thread;
- _thread = null;
- }
-
- if(threadPerConnection != null)
+ if(_transceiver != null || _dispatchCount != 0)
{
- while(true)
- {
- try
- {
- threadPerConnection.join();
- break;
- }
- catch(InterruptedException ex)
- {
- }
- }
+ return false;
}
-
+
+ assert(_state == StateClosed);
return true;
}
@@ -314,105 +226,82 @@ public final class ConnectionI extends IceInternal.EventHandler
}
}
- public void
+ public synchronized void
waitUntilFinished()
{
- Thread threadPerConnection = null;
-
- synchronized(this)
+ //
+ // We wait indefinitely until connection closing has been
+ // initiated. We also wait indefinitely until all outstanding
+ // requests are completed. Otherwise we couldn't guarantee
+ // that there are no outstanding calls when deactivate() is
+ // called on the servant locators.
+ //
+ while(_state < StateClosing || _dispatchCount > 0)
{
- //
- // We wait indefinitely until connection closing has been
- // initiated. We also wait indefinitely until all outstanding
- // requests are completed. Otherwise we couldn't guarantee
- // that there are no outstanding calls when deactivate() is
- // called on the servant locators.
- //
- while(_state < StateClosing || _dispatchCount > 0)
+ try
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ wait();
}
-
- //
- // Now we must wait until close() has been called on the
- // transceiver.
- //
- while(_transceiver != null)
+ catch(InterruptedException ex)
{
- try
+ }
+ }
+
+ //
+ // Now we must wait until close() has been called on the
+ // transceiver.
+ //
+ while(_transceiver != null)
+ {
+ try
+ {
+ if(_state != StateClosed && _endpoint.timeout() >= 0)
{
- if(_state != StateClosed && _endpoint.timeout() >= 0)
- {
- long absoluteWaitTime = _stateTime + _endpoint.timeout();
- long waitTime = absoluteWaitTime - IceInternal.Time.currentMonotonicTimeMillis();
+ long absoluteWaitTime = _stateTime + _endpoint.timeout();
+ long waitTime = absoluteWaitTime - IceInternal.Time.currentMonotonicTimeMillis();
- if(waitTime > 0)
- {
- //
- // We must wait a bit longer until we close this
- // connection.
- //
- wait(waitTime);
- if(IceInternal.Time.currentMonotonicTimeMillis() >= absoluteWaitTime)
- {
- setState(StateClosed, new CloseTimeoutException());
- }
- }
- else
+ if(waitTime > 0)
+ {
+ //
+ // We must wait a bit longer until we close this
+ // connection.
+ //
+ wait(waitTime);
+ if(IceInternal.Time.currentMonotonicTimeMillis() >= absoluteWaitTime)
{
- //
- // We already waited long enough, so let's close this
- // connection!
- //
setState(StateClosed, new CloseTimeoutException());
}
-
- //
- // No return here, we must still wait until
- // close() is called on the _transceiver.
- //
}
else
{
- wait();
+ //
+ // We already waited long enough, so let's close this
+ // connection!
+ //
+ setState(StateClosed, new CloseTimeoutException());
}
+
+ //
+ // No return here, we must still wait until
+ // close() is called on the _transceiver.
+ //
}
- catch(InterruptedException ex)
+ else
{
+ wait();
}
}
-
- assert(_state == StateClosed);
-
- threadPerConnection = _thread;
- _thread = null;
-
- //
- // Clear the OA. See bug 1673 for the details of why this is necessary.
- //
- _adapter = null;
- }
-
- if(threadPerConnection != null)
- {
- while(true)
+ catch(InterruptedException ex)
{
- try
- {
- threadPerConnection.join();
- break;
- }
- catch(InterruptedException ex)
- {
- }
}
}
+
+ assert(_state == StateClosed);
+
+ //
+ // Clear the OA. See bug 1673 for the details of why this is necessary.
+ //
+ _adapter = null;
}
synchronized public void
@@ -507,7 +396,7 @@ public final class ConnectionI extends IceInternal.EventHandler
return sent; // The request was sent.
}
- synchronized public void
+ synchronized public boolean
sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response)
throws IceInternal.LocalExceptionWrapper
{
@@ -546,9 +435,10 @@ public final class ConnectionI extends IceInternal.EventHandler
os.writeInt(requestId);
}
+ boolean sent;
try
{
- sendMessage(new OutgoingMessage(out, out.__os(), compress, response));
+ sent = sendMessage(new OutgoingMessage(out, out.__os(), compress, response));
}
catch(Ice.LocalException ex)
{
@@ -564,6 +454,7 @@ public final class ConnectionI extends IceInternal.EventHandler
//
_asyncRequests.put(requestId, out);
}
+ return sent;
}
public synchronized void
@@ -812,7 +703,7 @@ public final class ConnectionI extends IceInternal.EventHandler
return sent;
}
- synchronized public void
+ synchronized public boolean
flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync)
{
while(_batchStreamInUse && _exception == null)
@@ -834,7 +725,7 @@ public final class ConnectionI extends IceInternal.EventHandler
if(_batchRequestNum == 0)
{
outAsync.__sent(this);
- return;
+ return true;
}
//
@@ -844,11 +735,12 @@ public final class ConnectionI extends IceInternal.EventHandler
_batchStream.writeInt(_batchRequestNum);
_batchStream.swap(outAsync.__os());
-
+
+ boolean sent;
try
{
OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.__os(), _batchRequestCompress, false);
- sendMessage(message);
+ sent = sendMessage(message);
}
catch(Ice.LocalException ex)
{
@@ -864,6 +756,7 @@ public final class ConnectionI extends IceInternal.EventHandler
_batchRequestNum = 0;
_batchRequestCompress = false;
_batchMarker = 0;
+ return sent;
}
synchronized public void
@@ -941,12 +834,6 @@ public final class ConnectionI extends IceInternal.EventHandler
return _endpoint; // No mutex protection necessary, _endpoint is immutable.
}
- public boolean
- threadPerConnection()
- {
- return _threadPerConnection; // No mutex protection necessary, _threadPerConnection is immutable.
- }
-
public synchronized void
setAdapter(ObjectAdapter adapter)
{
@@ -1000,29 +887,41 @@ public final class ConnectionI extends IceInternal.EventHandler
}
//
+ // Operations from SelectorHandler
+ //
+ public java.nio.channels.SelectableChannel
+ fd()
+ {
+ return _transceiver.fd();
+ }
+
+ public boolean
+ hasMoreData()
+ {
+ return _hasMoreData.value;
+ }
+
+ //
// Operations from EventHandler
//
public boolean
datagram()
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
return _endpoint.datagram(); // No mutex protection necessary, _endpoint is immutable.
}
public boolean
readable()
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
return true;
}
public boolean
read(IceInternal.BasicStream stream)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
- return _transceiver.read(stream.getBuffer(), 0, _hasMoreData);
+ assert(_transceiver != null);
+ return _transceiver.read(stream.getBuffer(), _hasMoreData);
//
// Updating _acmAbsoluteTimeoutMillis is too expensive here,
@@ -1032,17 +931,9 @@ public final class ConnectionI extends IceInternal.EventHandler
//
}
- public boolean
- hasMoreData()
- {
- return _hasMoreData.value;
- }
-
public void
message(IceInternal.BasicStream stream, IceInternal.ThreadPool threadPool)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
MessageInfo info = new MessageInfo(stream);
synchronized(this)
@@ -1052,7 +943,7 @@ public final class ConnectionI extends IceInternal.EventHandler
// there could be various race conditions with close
// connection messages and other messages.
//
- threadPool.promoteFollower();
+ threadPool.promoteFollower(this);
if(_state != StateClosed)
{
@@ -1089,20 +980,14 @@ public final class ConnectionI extends IceInternal.EventHandler
public void
finished(IceInternal.ThreadPool threadPool)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
- threadPool.promoteFollower();
+ threadPool.promoteFollower(null);
Ice.LocalException localEx = null;
synchronized(this)
{
- --_finishedCount;
- if(_finishedCount > 0 || _state != StateClosed || _sendInProgress)
- {
- return;
- }
-
+ assert(threadPool == _threadPool && _state == StateClosed && !_sendInProgress);
+
try
{
_transceiver.close();
@@ -1115,16 +1000,20 @@ public final class ConnectionI extends IceInternal.EventHandler
_transceiver = null;
notifyAll();
}
-
- finishStart(_exception);
- java.util.Iterator<OutgoingMessage> p = _queuedStreams.iterator();
+ if(_startCallback != null)
+ {
+ _startCallback.connectionStartFailed(this, _exception);
+ _startCallback = null;
+ }
+
+ java.util.Iterator<OutgoingMessage> p = _sendStreams.iterator();
while(p.hasNext())
{
OutgoingMessage message = p.next();
message.finished(_exception);
}
- _queuedStreams.clear();
+ _sendStreams.clear();
java.util.Iterator<IceInternal.Outgoing> q =
_requests.values().iterator(); // _requests is immutable at this point.
@@ -1206,10 +1095,19 @@ public final class ConnectionI extends IceInternal.EventHandler
// Operations from SocketReadyCallback
//
public IceInternal.SocketStatus
- socketReady(boolean finished)
+ socketReady()
{
- if(!finished)
+ StartCallback callback = null;
+
+ synchronized(this)
{
+ assert(_sendInProgress);
+
+ if(_state == StateClosed)
+ {
+ return IceInternal.SocketStatus.Finished;
+ }
+
try
{
//
@@ -1218,7 +1116,7 @@ public final class ConnectionI extends IceInternal.EventHandler
//
if(!_sendStreams.isEmpty())
{
- if(!send(0))
+ if(!send())
{
return IceInternal.SocketStatus.NeedWrite;
}
@@ -1226,121 +1124,68 @@ public final class ConnectionI extends IceInternal.EventHandler
}
else
{
- //
- // If there's nothing to send, we're still validating the connection.
- //
- int state;
- synchronized(this)
+ if(_state == StateNotInitialized)
{
- assert(_state == StateClosed || _state <= StateNotValidated);
-
- if(_state == StateClosed)
- {
- assert(_exception != null);
- throw _exception;
- }
-
- state = _state;
- }
-
- if(state == StateNotInitialized)
- {
- IceInternal.SocketStatus status = initialize(0);
+ IceInternal.SocketStatus status = initialize();
if(status != IceInternal.SocketStatus.Finished)
{
return status;
}
}
- if(state <= StateNotValidated)
+ if(_state <= StateNotValidated)
{
- IceInternal.SocketStatus status = validate(0);
+ IceInternal.SocketStatus status = validate();
if(status != IceInternal.SocketStatus.Finished)
{
return status;
}
}
- finishStart(null);
+ callback = _startCallback;
+ _startCallback = null;
}
}
catch(Ice.LocalException ex)
{
- synchronized(this)
- {
- setState(StateClosed, ex);
- }
- }
- }
-
- //
- // If there's no more data to send or if connection validation is finished, we checkout
- // the connection state to figure out whether or not it's time to unregister with the
- // selector thread.
- //
-
- synchronized(this)
- {
- assert(_sendInProgress);
- if(_state == StateClosed)
- {
- assert(_startCallback == null || (!_threadPerConnection && !_registeredWithPool));
-
- _queuedStreams.addAll(0, _sendStreams);
- _sendInProgress = false;
-
- if(_threadPerConnection)
- {
- _transceiver.shutdownReadWrite();
- }
- else
- {
- if(!_registeredWithPool)
- {
- _threadPool.execute(new CallFinished(this));
- ++_finishedCount; // For each unregistration, finished() is called once.
- }
- else
- {
- unregisterWithPool();
- }
- }
-
- notifyAll();
- return IceInternal.SocketStatus.Finished;
- }
- else if(_queuedStreams.isEmpty())
- {
- _sendInProgress = false;
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
- }
+ setState(StateClosed, ex);
return IceInternal.SocketStatus.Finished;
}
- else
+
+ assert(_sendStreams.isEmpty());
+ _selectorThread.unregister(_socketReadyCallback);
+ _sendInProgress = false;
+ if(_acmTimeout > 0)
{
- java.util.LinkedList<OutgoingMessage> streams = _queuedStreams;
- _queuedStreams = _sendStreams;
- _sendStreams = streams;
- return IceInternal.SocketStatus.NeedWrite; // We're not finished yet, there's more data to send!
+ _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
}
}
+
+ if(callback != null)
+ {
+ callback.connectionStartCompleted(this);
+ }
+ return IceInternal.SocketStatus.Finished;
}
- public void
+ public synchronized void
+ socketFinished()
+ {
+ assert(_sendInProgress && _state == StateClosed);
+ _sendInProgress = false;
+ _threadPool.finish(this);
+ }
+
+ public synchronized void
socketTimeout()
{
- synchronized(this)
+ if(_state <= StateNotValidated)
{
- if(_state <= StateNotValidated)
- {
- setState(StateClosed, new ConnectTimeoutException());
- }
- else if(_state <= StateClosing)
- {
- setState(StateClosed, new TimeoutException());
- }
+ setState(StateClosed, new ConnectTimeoutException());
+ }
+ else if(_state <= StateClosing)
+ {
+ setState(StateClosed, new TimeoutException());
}
}
@@ -1357,12 +1202,11 @@ public final class ConnectionI extends IceInternal.EventHandler
}
public ConnectionI(IceInternal.Instance instance, IceInternal.Transceiver transceiver,
- IceInternal.EndpointI endpoint, ObjectAdapter adapter, boolean threadPerConnection)
+ IceInternal.EndpointI endpoint, ObjectAdapter adapter)
{
super(instance);
final Ice.InitializationData initData = instance.initializationData();
- _threadPerConnection = threadPerConnection;
_transceiver = transceiver;
_desc = transceiver.toString();
_type = transceiver.type();
@@ -1370,8 +1214,6 @@ public final class ConnectionI extends IceInternal.EventHandler
_adapter = adapter;
_logger = initData.logger; // Cached for better performance.
_traceLevels = instance.traceLevels(); // Cached for better performance.
- _registeredWithPool = false;
- _finishedCount = 0;
_warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
_cacheBuffers = initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 1) == 1;
_acmAbsoluteTimeoutMillis = 0;
@@ -1425,26 +1267,13 @@ public final class ConnectionI extends IceInternal.EventHandler
try
{
- if(!threadPerConnection)
+ if(_adapter != null)
{
- //
- // Only set _threadPool if we really need it, i.e., if we are
- // not in thread per connection mode. Thread pools have lazy
- // initialization in Instance, and we don't want them to be
- // created if they are not needed.
- //
- if(_adapter != null)
- {
- _threadPool = ((ObjectAdapterI)_adapter).getThreadPool();
- }
- else
- {
- _threadPool = _instance.clientThreadPool();
- }
+ _threadPool = ((ObjectAdapterI)_adapter).getThreadPool();
}
else
{
- _threadPool = null; // To satisfy the compiler.
+ _threadPool = _instance.clientThreadPool();
}
_selectorThread = _instance.selectorThread();
@@ -1472,8 +1301,7 @@ public final class ConnectionI extends IceInternal.EventHandler
IceUtilInternal.Assert.FinalizerAssert(_state == StateClosed);
IceUtilInternal.Assert.FinalizerAssert(_transceiver == null);
IceUtilInternal.Assert.FinalizerAssert(_dispatchCount == 0);
- IceUtilInternal.Assert.FinalizerAssert(_thread == null);
- IceUtilInternal.Assert.FinalizerAssert(_queuedStreams.isEmpty());
+ IceUtilInternal.Assert.FinalizerAssert(_sendStreams.isEmpty());
IceUtilInternal.Assert.FinalizerAssert(_requests.isEmpty());
IceUtilInternal.Assert.FinalizerAssert(_asyncRequests.isEmpty());
@@ -1589,10 +1417,7 @@ public final class ConnectionI extends IceInternal.EventHandler
{
return;
}
- if(!_threadPerConnection)
- {
- registerWithPool();
- }
+ _threadPool._register(this);
break;
}
@@ -1606,10 +1431,7 @@ public final class ConnectionI extends IceInternal.EventHandler
{
return;
}
- if(!_threadPerConnection)
- {
- unregisterWithPool();
- }
+ _threadPool.unregister(this);
break;
}
@@ -1622,10 +1444,7 @@ public final class ConnectionI extends IceInternal.EventHandler
{
return;
}
- if(!_threadPerConnection)
- {
- registerWithPool(); // We need to continue to read in closing state.
- }
+ _threadPool._register(this);
break;
}
@@ -1640,39 +1459,12 @@ public final class ConnectionI extends IceInternal.EventHandler
// The selector thread will register again the FD with the pool once it's
// done.
//
- _selectorThread.unregister(_transceiver.fd());
- if(!_threadPerConnection)
- {
- unregisterWithPool();
- }
-
- _transceiver.shutdownWrite();
- }
- else if(_threadPerConnection)
- {
- //
- // If we are in thread per connection mode and the thread is started, we
- // shutdown both for reading and writing. This will unblock the read call
- // with an exception. The thread per connection closes the transceiver.
- //
- _transceiver.shutdownReadWrite();
+ _selectorThread.finish(_socketReadyCallback);
+ _threadPool.unregister(this);
}
else
{
- if(!_registeredWithPool)
- {
- _threadPool.execute(new CallFinished(this));
- ++_finishedCount; // For each unregistration, finished() is called once.
- }
- else
- {
- unregisterWithPool();
- }
-
- //
- // Prevent further writes.
- //
- _transceiver.shutdownWrite();
+ _threadPool.finish(this);
}
break;
}
@@ -1752,46 +1544,24 @@ public final class ConnectionI extends IceInternal.EventHandler
}
private IceInternal.SocketStatus
- initialize(int timeout)
+ initialize()
{
- try
- {
- IceInternal.SocketStatus status = _transceiver.initialize(timeout);
- if(status != IceInternal.SocketStatus.Finished)
- {
- if(timeout != 0)
- {
- throw new Ice.TimeoutException();
- }
- return status;
- }
- }
- catch(Ice.TimeoutException ex)
+ IceInternal.SocketStatus status = _transceiver.initialize();
+ if(status != IceInternal.SocketStatus.Finished)
{
- throw new Ice.ConnectTimeoutException();
- }
-
- synchronized(this)
- {
- if(_state == StateClosed)
- {
- assert(_exception != null);
- throw _exception;
- }
-
- //
- // Update the connection description once the transceiver is initialized.
- //
- _desc = _transceiver.toString();
-
- setState(StateNotValidated);
+ return status;
}
+ //
+ // Update the connection description once the transceiver is initialized.
+ //
+ _desc = _transceiver.toString();
+ setState(StateNotValidated);
return IceInternal.SocketStatus.Finished;
}
private IceInternal.SocketStatus
- validate(int timeout)
+ validate()
{
if(!_endpoint.datagram()) // Datagram connections are always implicitly validated.
{
@@ -1811,26 +1581,10 @@ public final class ConnectionI extends IceInternal.EventHandler
IceInternal.TraceUtil.traceSend(os, _logger, _traceLevels);
os.prepareWrite();
}
- else
- {
- // The stream can only be non-empty if we're doing a non-blocking connection validation.
- assert(!_threadPerConnection);
- }
- try
- {
- if(!_transceiver.write(os.getBuffer(), timeout))
- {
- if(timeout != 0)
- {
- throw new Ice.TimeoutException();
- }
- return IceInternal.SocketStatus.NeedWrite;
- }
- }
- catch(Ice.TimeoutException ex)
+ if(!_transceiver.write(os.getBuffer()))
{
- throw new Ice.ConnectTimeoutException();
+ return IceInternal.SocketStatus.NeedWrite;
}
}
else // The client side has the passive role for connection validation.
@@ -1841,26 +1595,10 @@ public final class ConnectionI extends IceInternal.EventHandler
is.resize(IceInternal.Protocol.headerSize, true);
is.pos(0);
}
- else
- {
- // The stream can only be non-empty if we're doing a non-blocking connection validation.
- assert(!_threadPerConnection);
- }
- try
+ if(!_transceiver.read(is.getBuffer(), _hasMoreData))
{
- if(!_transceiver.read(is.getBuffer(), timeout, _hasMoreData))
- {
- if(timeout != 0)
- {
- throw new Ice.TimeoutException();
- }
- return IceInternal.SocketStatus.NeedRead;
- }
- }
- catch(Ice.TimeoutException ex)
- {
- throw new Ice.ConnectTimeoutException();
+ return IceInternal.SocketStatus.NeedRead;
}
assert(is.pos() == IceInternal.Protocol.headerSize);
@@ -1910,31 +1648,23 @@ public final class ConnectionI extends IceInternal.EventHandler
}
}
- synchronized(this)
- {
- _stream.reset();
-
- if(_state == StateClosed)
- {
- assert(_exception != null);
- throw _exception;
- }
-
- //
- // We start out in holding state.
- //
- setState(StateHolding);
- }
+ _stream.reset();
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
return IceInternal.SocketStatus.Finished;
}
private boolean
- send(int timeout)
+ send()
{
assert(_transceiver != null);
assert(!_sendStreams.isEmpty());
+ boolean flushSentCallbacks = _sentCallbacks.isEmpty();
+
while(!_sendStreams.isEmpty())
{
OutgoingMessage message = _sendStreams.getFirst();
@@ -1958,34 +1688,56 @@ public final class ConnectionI extends IceInternal.EventHandler
}
- if(!_transceiver.write(message.stream.getBuffer(), timeout))
+ if(!_transceiver.write(message.stream.getBuffer()))
{
- assert(timeout == 0);
+ if(flushSentCallbacks && !_sentCallbacks.isEmpty())
+ {
+ _threadPool.execute(_flushSentCallbacks);
+ }
return false;
}
- message.sent(this, timeout == 0); // timeout == 0 indicates that this is called by the selector thread.
+ message.sent(this, true);
+
+ if(message.outAsync instanceof Ice.AMISentCallback)
+ {
+ _sentCallbacks.add(message);
+ }
+
_sendStreams.removeFirst();
}
+ if(flushSentCallbacks && !_sentCallbacks.isEmpty())
+ {
+ _threadPool.execute(_flushSentCallbacks);
+ }
return true;
}
+ private void
+ flushSentCallbacks()
+ {
+ java.util.List<OutgoingMessage> sentCallbacks;
+ synchronized(this)
+ {
+ assert(_sentCallbacks != null && !_sentCallbacks.isEmpty());
+ sentCallbacks = _sentCallbacks;
+ _sentCallbacks = new java.util.LinkedList<OutgoingMessage>();
+ }
+ for(OutgoingMessage message : sentCallbacks)
+ {
+ message.outAsync.__sent(_instance);
+ }
+ }
+
private boolean
sendMessage(OutgoingMessage message)
{
assert(_state != StateClosed);
-
- //
- // If another thread is currently sending messages, we queue the
- // message in _queuedStreams. It will be picked up eventually by
- // the selector thread once the messages from _sendStreams are all
- // sent.
- //
if(_sendInProgress)
{
message.adopt();
- _queuedStreams.addLast(message);
+ _sendStreams.addLast(message);
return false;
}
@@ -2015,7 +1767,7 @@ public final class ConnectionI extends IceInternal.EventHandler
IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels);
}
- if(_transceiver.write(message.stream.getBuffer(), 0))
+ if(_transceiver.write(message.stream.getBuffer()))
{
message.sent(this, false);
if(_acmTimeout > 0)
@@ -2028,64 +1780,10 @@ public final class ConnectionI extends IceInternal.EventHandler
_sendStreams.addLast(message);
_sendInProgress = true;
message.adopt();
- _selectorThread._register(_transceiver.fd(), this, IceInternal.SocketStatus.NeedWrite, _endpoint.timeout());
+ _selectorThread._register(_socketReadyCallback, IceInternal.SocketStatus.NeedWrite, _endpoint.timeout());
return false;
}
- private void
- finishStart(Ice.LocalException ex)
- {
- //
- // We set _startCallback to null to break potential cyclic reference count
- // and because the finalizer checks for it to ensure that we always invoke
- // on the callback.
- //
-
- StartCallback callback = null;
- synchronized(this)
- {
- callback = _startCallback;
- _startCallback = null;
- }
-
- if(callback != null)
- {
- if(ex == null)
- {
- callback.connectionStartCompleted(this);
- }
- else
- {
- callback.connectionStartFailed(this, ex);
- }
- }
- }
-
- private void
- registerWithPool()
- {
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
- if(!_registeredWithPool)
- {
- _threadPool._register(_transceiver.fd(), this);
- _registeredWithPool = true;
- }
- }
-
- private void
- unregisterWithPool()
- {
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
- if(_registeredWithPool)
- {
- _threadPool.unregister(_transceiver.fd());
- _registeredWithPool = false;
- ++_finishedCount; // For each unregistration, finished() is called once.
- }
- }
-
private IceInternal.BasicStream
doCompress(IceInternal.BasicStream uncompressed, boolean compress)
{
@@ -2166,9 +1864,8 @@ public final class ConnectionI extends IceInternal.EventHandler
try
{
//
- // We don't need to check magic and version here. This has
- // already been done by the ThreadPool or ThreadPerConnection,
- // which provides us with the stream.
+ // We don't need to check magic and version here. This has already
+ // been done by the ThreadPool which provides us with the stream.
//
assert(info.stream.pos() == info.stream.size());
info.stream.pos(8);
@@ -2392,305 +2089,6 @@ public final class ConnectionI extends IceInternal.EventHandler
}
private void
- run()
- {
- try
- {
- //
- // Initialize the connection transceiver and validate the connection using
- // blocking operations.
- //
-
- IceInternal.SocketStatus status;
-
- int timeout;
- IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideConnectTimeout)
- {
- timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
- }
- else
- {
- timeout = _endpoint.timeout();
- }
-
- status = initialize(timeout);
- assert(status == IceInternal.SocketStatus.Finished);
-
- status = validate(timeout);
- assert(status == IceInternal.SocketStatus.Finished);
- }
- catch(LocalException ex)
- {
- synchronized(this)
- {
- setState(StateClosed, ex);
-
- if(_transceiver != null)
- {
- try
- {
- _transceiver.close();
- }
- catch(LocalException e)
- {
- // Here we ignore any exceptions in close().
- }
-
- _transceiver = null;
- }
- notifyAll();
- }
-
- finishStart(_exception);
- return;
- }
-
- finishStart(null);
-
- boolean warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
-
- boolean closed = false;
-
- IceInternal.BasicStream stream = new IceInternal.BasicStream(_instance);
- while(!closed)
- {
- //
- // We must read new messages outside the thread synchronization because we use blocking reads.
- //
-
- try
- {
- try
- {
- stream.resize(IceInternal.Protocol.headerSize, true);
- stream.pos(0);
- _transceiver.read(stream.getBuffer(), -1, _hasMoreData);
-
- int pos = stream.pos();
- if(pos < IceInternal.Protocol.headerSize)
- {
- //
- // This situation is possible for small UDP packets.
- //
- throw new IllegalMessageSizeException();
- }
- stream.pos(0);
- byte[] m = stream.readBlob(4);
- if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
- m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
- {
- BadMagicException ex = new BadMagicException();
- ex.badMagic = m;
- throw ex;
- }
- byte pMajor = stream.readByte();
- byte pMinor = stream.readByte();
- if(pMajor != IceInternal.Protocol.protocolMajor)
- {
- UnsupportedProtocolException e = new UnsupportedProtocolException();
- e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
- e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
- e.major = IceInternal.Protocol.protocolMajor;
- e.minor = IceInternal.Protocol.protocolMinor;
- throw e;
- }
- byte eMajor = stream.readByte();
- byte eMinor = stream.readByte();
- if(eMajor != IceInternal.Protocol.encodingMajor)
- {
- UnsupportedEncodingException e = new UnsupportedEncodingException();
- e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
- e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
- e.major = IceInternal.Protocol.encodingMajor;
- e.minor = IceInternal.Protocol.encodingMinor;
- throw e;
- }
- byte messageType = stream.readByte();
- byte compress = stream.readByte();
- int size = stream.readInt();
- if(size < IceInternal.Protocol.headerSize)
- {
- throw new IllegalMessageSizeException();
- }
- if(size > _instance.messageSizeMax())
- {
- throw new MemoryLimitException();
- }
- if(size > stream.size())
- {
- stream.resize(size, true);
- }
- stream.pos(pos);
-
- if(pos != stream.size())
- {
- if(_endpoint.datagram())
- {
- if(warnUdp)
- {
- _logger.warning("DatagramLimitException: maximum size of " + pos + " exceeded");
- }
- throw new DatagramLimitException();
- }
- else
- {
- _transceiver.read(stream.getBuffer(), -1, _hasMoreData);
- assert(stream.pos() == stream.size());
- }
- }
- }
- catch(DatagramLimitException ex) // Expected.
- {
- continue;
- }
- catch(SocketException ex)
- {
- exception(ex);
- }
- catch(LocalException ex)
- {
- if(_endpoint.datagram())
- {
- if(_warn)
- {
- _logger.warning("datagram connection exception:\n" + ex + "\n" + _desc);
- }
- continue;
- }
- else
- {
- exception(ex);
- }
- }
-
- MessageInfo info = new MessageInfo(stream);
-
- LocalException localEx = null;
-
- synchronized(this)
- {
- while(_state == StateHolding)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
- if(_state != StateClosed)
- {
- parseMessage(info);
- }
-
- //
- // parseMessage() can close the connection, so we must
- // check for closed state again.
- //
- if(_state == StateClosed)
- {
- if(_sendInProgress)
- {
- _selectorThread.unregister(_transceiver.fd());
- }
-
- //
- // Prevent further writes.
- //
- _transceiver.shutdownWrite();
-
- while(_sendInProgress)
- {
- try
- {
- wait();
- }
- catch(java.lang.Exception ex)
- {
- }
- }
-
- try
- {
- _transceiver.close();
- }
- catch(LocalException ex)
- {
- localEx = ex;
- }
- _transceiver = null;
- notifyAll();
-
- //
- // We cannot simply return here. We have to make sure
- // that all requests (regular and async) are notified
- // about the closed connection below.
- //
- closed = true;
- }
- }
-
- //
- // Asynchronous replies must be handled outside the thread
- // synchronization, so that nested calls are possible.
- //
- if(info.outAsync != null)
- {
- info.outAsync.__finished(info.stream);
- }
-
- //
- // Method invocation (or multiple invocations for batch messages)
- // must be done outside the thread synchronization, so that nested
- // calls are possible.
- //
- invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
- info.adapter);
-
- if(closed)
- {
- java.util.Iterator<OutgoingMessage> p = _queuedStreams.iterator();
- while(p.hasNext())
- {
- OutgoingMessage message = p.next();
- message.finished(_exception);
- }
- _queuedStreams.clear();
-
- java.util.Iterator<IceInternal.Outgoing> q = _requests.values().iterator();
- while(q.hasNext())
- {
- IceInternal.Outgoing out = q.next();
- out.finished(_exception); // The exception is immutable at this point.
- }
- _requests.clear();
-
- java.util.Iterator<IceInternal.OutgoingAsync> r = _asyncRequests.values().iterator();
- while(r.hasNext())
- {
- IceInternal.OutgoingAsync out = r.next();
- out.__finished(_exception); // The exception is immutable at this point.
- }
- _asyncRequests.clear();
- }
-
- if(localEx != null)
- {
- assert(closed);
- throw localEx;
- }
- }
- finally
- {
- stream.reset();
- }
- }
- }
-
- private void
warning(String msg, Exception ex)
{
java.io.StringWriter sw = new java.io.StringWriter();
@@ -2809,34 +2207,6 @@ public final class ConnectionI extends IceInternal.EventHandler
}
}
- private class ThreadPerConnection extends Thread
- {
- public void
- run()
- {
- if(ConnectionI.this._instance.initializationData().threadHook != null)
- {
- ConnectionI.this._instance.initializationData().threadHook.start();
- }
-
- try
- {
- ConnectionI.this.run();
- }
- catch(Exception ex)
- {
- ConnectionI.this.error("exception in thread per connection", ex);
- }
- finally
- {
- if(ConnectionI.this._instance.initializationData().threadHook != null)
- {
- ConnectionI.this._instance.initializationData().threadHook.stop();
- }
- }
- }
- }
-
private static class OutgoingMessage
{
OutgoingMessage(IceInternal.BasicStream stream, boolean compress, boolean adopt)
@@ -2919,8 +2289,46 @@ public final class ConnectionI extends IceInternal.EventHandler
boolean prepared;
}
- private Thread _thread;
- private final boolean _threadPerConnection;
+ static class SocketReadyCallback extends IceInternal.SelectorThread.SocketReadyCallback
+ {
+ public
+ SocketReadyCallback(ConnectionI connection)
+ {
+ _connection = connection;
+ }
+
+ public java.nio.channels.SelectableChannel
+ fd()
+ {
+ return _connection.fd();
+ }
+
+ public boolean
+ hasMoreData()
+ {
+ return _connection.hasMoreData();
+ }
+
+ public IceInternal.SocketStatus
+ socketReady()
+ {
+ return _connection.socketReady();
+ }
+
+ public void
+ socketFinished()
+ {
+ _connection.socketFinished();
+ }
+
+ public void
+ runTimerTask()
+ {
+ _connection.socketTimeout();
+ }
+
+ final private ConnectionI _connection;
+ };
private IceInternal.Transceiver _transceiver;
private Ice.BooleanHolder _hasMoreData = new Ice.BooleanHolder(false);
@@ -2928,6 +2336,7 @@ public final class ConnectionI extends IceInternal.EventHandler
private String _desc;
private final String _type;
private final IceInternal.EndpointI _endpoint;
+ private final SocketReadyCallback _socketReadyCallback = new SocketReadyCallback(this);
private ObjectAdapter _adapter;
private IceInternal.ServantManager _servantManager;
@@ -2935,8 +2344,6 @@ public final class ConnectionI extends IceInternal.EventHandler
private final Logger _logger;
private final IceInternal.TraceLevels _traceLevels;
- private boolean _registeredWithPool;
- private int _finishedCount;
private final IceInternal.ThreadPool _threadPool;
private final IceInternal.SelectorThread _selectorThread;
@@ -2965,10 +2372,20 @@ public final class ConnectionI extends IceInternal.EventHandler
private boolean _batchRequestCompress;
private int _batchMarker;
- private java.util.LinkedList<OutgoingMessage> _queuedStreams = new java.util.LinkedList<OutgoingMessage>();
private java.util.LinkedList<OutgoingMessage> _sendStreams = new java.util.LinkedList<OutgoingMessage>();
private boolean _sendInProgress;
+ private java.util.List<OutgoingMessage> _sentCallbacks = new java.util.LinkedList<OutgoingMessage>();
+ private IceInternal.ThreadPoolWorkItem _flushSentCallbacks = new IceInternal.ThreadPoolWorkItem()
+ {
+ public void
+ execute(IceInternal.ThreadPool threadPool)
+ {
+ threadPool.promoteFollower(null);
+ ConnectionI.this.flushSentCallbacks();
+ };
+ };
+
private int _dispatchCount;
private int _state; // The current state.
diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java
index 6998793fa06..9459d4bdb6a 100644
--- a/java/src/Ice/ObjectAdapterI.java
+++ b/java/src/Ice/ObjectAdapterI.java
@@ -725,15 +725,6 @@ public final class ObjectAdapterI implements ObjectAdapter
return _servantManager;
}
- public boolean
- getThreadPerConnection()
- {
- //
- // No mutex lock necessary, _threadPerConnection is immutable.
- //
- return _threadPerConnection;
- }
-
//
// Only for use by IceInternal.ObjectAdapterFactory
//
@@ -821,23 +812,12 @@ public final class ObjectAdapterI implements ObjectAdapter
try
{
- _threadPerConnection = properties.getPropertyAsInt(_name + ".ThreadPerConnection") > 0;
-
int threadPoolSize = properties.getPropertyAsInt(_name + ".ThreadPool.Size");
int threadPoolSizeMax = properties.getPropertyAsInt(_name + ".ThreadPool.SizeMax");
- if(_threadPerConnection && (threadPoolSize > 0 || threadPoolSizeMax > 0))
- {
- InitializationException ex = new InitializationException();
- ex.reason = "object adapter `" + _name + "' cannot be configured for both\n" +
- "thread pool and thread per connection";
- throw ex;
- }
-
- if(!_threadPerConnection && threadPoolSize == 0 && threadPoolSizeMax == 0)
- {
- _threadPerConnection = _instance.threadPerConnection();
- }
+ //
+ // Create the per-adapter thread pool, if necessary.
+ //
if(threadPoolSize > 0 || threadPoolSizeMax > 0)
{
_threadPool = new IceInternal.ThreadPool(_instance, _name + ".ThreadPool", 0);
@@ -1372,8 +1352,6 @@ public final class ObjectAdapterI implements ObjectAdapter
"ReplicaGroupId",
"Router",
"ProxyOptions",
- "ThreadPerConnection",
- "ThreadPerConnection.StackSize",
"ThreadPool.Size",
"ThreadPool.SizeMax",
"ThreadPool.SizeWarn",
@@ -1447,6 +1425,5 @@ public final class ObjectAdapterI implements ObjectAdapter
private boolean _destroying;
private boolean _destroyed;
private boolean _noConfig;
- private boolean _threadPerConnection;
private Identity _processId = null;
}
diff --git a/java/src/Ice/ObjectPrx.java b/java/src/Ice/ObjectPrx.java
index 4cf498fd19c..cc8bea254ff 100644
--- a/java/src/Ice/ObjectPrx.java
+++ b/java/src/Ice/ObjectPrx.java
@@ -34,9 +34,9 @@ public interface ObjectPrx
boolean ice_invoke(String operation, OperationMode mode, byte[] inParams, ByteSeqHolder outParams,
java.util.Map __context);
- void ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams);
- void ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams,
- java.util.Map context);
+ boolean ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams);
+ boolean ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams,
+ java.util.Map context);
Identity ice_getIdentity();
ObjectPrx ice_identity(Identity newIdentity);
@@ -96,14 +96,11 @@ public interface ObjectPrx
ObjectPrx ice_timeout(int t);
ObjectPrx ice_connectionId(String connectionId);
- boolean ice_isThreadPerConnection();
- ObjectPrx ice_threadPerConnection(boolean tpc);
-
Connection ice_getConnection();
Connection ice_getCachedConnection();
void ice_flushBatchRequests();
- void ice_flushBatchRequests_async(AMI_Object_ice_flushBatchRequests cb);
+ boolean ice_flushBatchRequests_async(AMI_Object_ice_flushBatchRequests cb);
boolean equals(java.lang.Object r);
}
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index 5621fbbbbbb..0b4bdbb048f 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -251,14 +251,14 @@ public class ObjectPrxHelperBase implements ObjectPrx
}
}
- public final void
+ public final boolean
ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams)
{
__checkTwowayOnly("ice_invoke_async");
- cb.__invoke(this, operation, mode, inParams, null);
+ return cb.__invoke(this, operation, mode, inParams, null);
}
- public final void
+ public final boolean
ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams,
java.util.Map context)
{
@@ -267,7 +267,7 @@ public class ObjectPrxHelperBase implements ObjectPrx
context = _emptyContext;
}
__checkTwowayOnly("ice_invoke_async");
- cb.__invoke(this, operation, mode, inParams, context);
+ return cb.__invoke(this, operation, mode, inParams, context);
}
public final Identity
@@ -680,26 +680,6 @@ public class ObjectPrxHelperBase implements ObjectPrx
}
}
- public boolean
- ice_isThreadPerConnection()
- {
- return _reference.getThreadPerConnection();
- }
-
- public ObjectPrx
- ice_threadPerConnection(boolean tpc)
- {
- IceInternal.Reference ref = _reference.changeThreadPerConnection(tpc);
- if(ref.equals(_reference))
- {
- return this;
- }
- else
- {
- return newInstance(ref);
- }
- }
-
public final Connection
ice_getConnection()
{
@@ -764,10 +744,10 @@ public class ObjectPrxHelperBase implements ObjectPrx
}
}
- public void
+ public boolean
ice_flushBatchRequests_async(AMI_Object_ice_flushBatchRequests cb)
{
- cb.__invoke(this);
+ return cb.__invoke(this);
}
public final boolean
diff --git a/java/src/IceGridGUI/Coordinator.java b/java/src/IceGridGUI/Coordinator.java
index 769335e1c71..8d9ebb0400c 100755
--- a/java/src/IceGridGUI/Coordinator.java
+++ b/java/src/IceGridGUI/Coordinator.java
@@ -1798,8 +1798,6 @@ public class Coordinator
//
properties.setProperty("Ice.Override.ConnectTimeout", "5000");
- properties.setProperty("Ice.ThreadPerConnection", "1");
-
//
// For Glacier
//
diff --git a/java/src/IceInternal/Acceptor.java b/java/src/IceInternal/Acceptor.java
index 25de96607bc..4c2936175af 100644
--- a/java/src/IceInternal/Acceptor.java
+++ b/java/src/IceInternal/Acceptor.java
@@ -14,7 +14,6 @@ public interface Acceptor
java.nio.channels.ServerSocketChannel fd();
void close();
void listen();
- Transceiver accept(int timeout);
- void connectToSelf();
+ Transceiver accept();
String toString();
}
diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java
index e9d07332a06..6596a4e3375 100644
--- a/java/src/IceInternal/BatchOutgoing.java
+++ b/java/src/IceInternal/BatchOutgoing.java
@@ -57,9 +57,9 @@ public final class BatchOutgoing implements OutgoingMessageCallback
}
public void
- sent(boolean notify)
+ sent(boolean async)
{
- if(notify)
+ if(async)
{
synchronized(this)
{
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 94c3c3721d6..7c0a1db0c0c 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -144,7 +144,7 @@ public class ConnectRequestHandler
}
}
- public void
+ public boolean
sendAsyncRequest(OutgoingAsync out)
throws LocalExceptionWrapper
{
@@ -153,10 +153,10 @@ public class ConnectRequestHandler
if(!initialized())
{
_requests.add(new Request(out));
- return;
+ return false;
}
}
- _connection.sendAsyncRequest(out, _compress, _response);
+ return _connection.sendAsyncRequest(out, _compress, _response);
}
public boolean
@@ -165,7 +165,7 @@ public class ConnectRequestHandler
return getConnection(true).flushBatchRequests(out);
}
- public void
+ public boolean
flushAsyncBatchRequests(BatchOutgoingAsync out)
{
synchronized(this)
@@ -173,10 +173,10 @@ public class ConnectRequestHandler
if(!initialized())
{
_requests.add(new Request(out));
- return;
+ return false;
}
}
- _connection.flushAsyncBatchRequests(out);
+ return _connection.flushAsyncBatchRequests(out);
}
public Outgoing
@@ -297,7 +297,7 @@ public class ConnectRequestHandler
public void
execute(ThreadPool threadPool)
{
- threadPool.promoteFollower();
+ threadPool.promoteFollower(null);
flushRequestsWithException(ex);
};
});
@@ -392,6 +392,8 @@ public class ConnectRequestHandler
_flushing = true;
}
+ final java.util.List<OutgoingAsyncMessageCallback> sentCallbacks =
+ new java.util.ArrayList<OutgoingAsyncMessageCallback>();
try
{
java.util.Iterator<Request> p = _requests.iterator(); // _requests is immutable when _flushing = true
@@ -400,11 +402,23 @@ public class ConnectRequestHandler
Request request = p.next();
if(request.out != null)
{
- _connection.sendAsyncRequest(request.out, _compress, _response);
+ if(_connection.sendAsyncRequest(request.out, _compress, _response))
+ {
+ if(request.out instanceof Ice.AMISentCallback)
+ {
+ sentCallbacks.add(request.out);
+ }
+ }
}
else if(request.batchOut != null)
{
- _connection.flushAsyncBatchRequests(request.batchOut);
+ if(_connection.flushAsyncBatchRequests(request.batchOut))
+ {
+ if(request.batchOut instanceof Ice.AMISentCallback)
+ {
+ sentCallbacks.add(request.batchOut);
+ }
+ }
}
else
{
@@ -436,12 +450,10 @@ public class ConnectRequestHandler
public void
execute(ThreadPool threadPool)
{
- threadPool.promoteFollower();
+ threadPool.promoteFollower(null);
flushRequestsWithException(ex);
};
});
- notifyAll();
- return;
}
}
catch(final Ice.LocalException ex)
@@ -455,15 +467,30 @@ public class ConnectRequestHandler
public void
execute(ThreadPool threadPool)
{
- threadPool.promoteFollower();
+ threadPool.promoteFollower(null);
flushRequestsWithException(ex);
};
});
- notifyAll();
- return;
}
}
-
+
+ if(!sentCallbacks.isEmpty())
+ {
+ final Instance instance = _reference.getInstance();
+ instance.clientThreadPool().execute(new ThreadPoolWorkItem()
+ {
+ public void
+ execute(ThreadPool threadPool)
+ {
+ threadPool.promoteFollower(null);
+ for(OutgoingAsyncMessageCallback callback : sentCallbacks)
+ {
+ callback.__sent(instance);
+ }
+ };
+ });
+ }
+
//
// We've finished sending the queued requests and the request handler now send
// the requests over the connection directly. It's time to substitute the
@@ -480,9 +507,12 @@ public class ConnectRequestHandler
synchronized(this)
{
- assert(!_initialized);
- _initialized = true;
- _flushing = false;
+ if(_exception == null)
+ {
+ assert(!_initialized);
+ _initialized = true;
+ _flushing = false;
+ }
_proxy = null; // Break cyclic reference count.
_delegate = null; // Break cyclic reference count.
notifyAll();
diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java
index 8b74c44b9f1..c02f9e580a2 100644
--- a/java/src/IceInternal/ConnectionRequestHandler.java
+++ b/java/src/IceInternal/ConnectionRequestHandler.java
@@ -39,15 +39,15 @@ public class ConnectionRequestHandler implements RequestHandler
}
else
{
- return null; // The request hasn't been sent yet.
+ return null; // The request has been sent.
}
}
- public void
+ public boolean
sendAsyncRequest(OutgoingAsync out)
throws LocalExceptionWrapper
{
- _connection.sendAsyncRequest(out, _compress, _response);
+ return _connection.sendAsyncRequest(out, _compress, _response);
}
public boolean
@@ -56,10 +56,10 @@ public class ConnectionRequestHandler implements RequestHandler
return _connection.flushBatchRequests(out);
}
- public void
+ public boolean
flushAsyncBatchRequests(BatchOutgoingAsync out)
{
- _connection.flushAsyncBatchRequests(out);
+ return _connection.flushAsyncBatchRequests(out);
}
public Outgoing
diff --git a/java/src/IceInternal/Connector.java b/java/src/IceInternal/Connector.java
index 7a0b27a0b40..cda93a65d91 100644
--- a/java/src/IceInternal/Connector.java
+++ b/java/src/IceInternal/Connector.java
@@ -11,7 +11,7 @@ package IceInternal;
public interface Connector
{
- Transceiver connect(int timeout);
+ Transceiver connect();
short type();
String toString();
diff --git a/java/src/IceInternal/EventHandler.java b/java/src/IceInternal/EventHandler.java
index ed6020fe304..5bb50523d2d 100644
--- a/java/src/IceInternal/EventHandler.java
+++ b/java/src/IceInternal/EventHandler.java
@@ -9,7 +9,7 @@
package IceInternal;
-public abstract class EventHandler
+public abstract class EventHandler extends SelectorHandler
{
//
// Return true if the handler is for a datagram transport, false otherwise.
@@ -28,14 +28,6 @@ public abstract class EventHandler
abstract public boolean read(BasicStream is);
//
- // In Java, it's possible that the transceiver reads more data
- // than what was really asked. If this is the case, hasMoreData()
- // returns true and the handler read() method should be called
- // again (without doing a select()).
- //
- abstract public boolean hasMoreData();
-
- //
// A complete message has been received.
//
abstract public void message(BasicStream stream, ThreadPool threadPool);
@@ -77,4 +69,6 @@ public abstract class EventHandler
// connection for validation.
//
protected BasicStream _stream;
+ boolean _serializing;
+ boolean _registered;
}
diff --git a/java/src/IceInternal/FixedReference.java b/java/src/IceInternal/FixedReference.java
index 376b42ed1e2..afa5744cae9 100644
--- a/java/src/IceInternal/FixedReference.java
+++ b/java/src/IceInternal/FixedReference.java
@@ -73,12 +73,6 @@ public class FixedReference extends Reference
return Ice.EndpointSelectionType.Random;
}
- public boolean
- getThreadPerConnection()
- {
- return false;
- }
-
public int
getLocatorCacheTimeout()
{
@@ -133,12 +127,6 @@ public class FixedReference extends Reference
throw new Ice.FixedProxyException();
}
- public final Reference
- changeThreadPerConnection(boolean newTpc)
- {
- throw new Ice.FixedProxyException();
- }
-
public Reference
changeLocatorCacheTimeout(int newTimeout)
{
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index 4efcaf79318..d231b2eea04 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -72,7 +72,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
public void
waitUntilFinished()
{
- Thread threadPerIncomingConnectionFactory = null;
java.util.LinkedList<Ice.ConnectionI> connections = null;
synchronized(this)
@@ -92,9 +91,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
}
- threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory;
- _threadPerIncomingConnectionFactory = null;
-
//
// Clear the OA. See bug 1673 for the details of why this is necessary.
//
@@ -110,21 +106,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
}
- if(threadPerIncomingConnectionFactory != null)
- {
- while(true)
- {
- try
- {
- threadPerIncomingConnectionFactory.join();
- break;
- }
- catch(InterruptedException ex)
- {
- }
- }
- }
-
if(connections != null)
{
java.util.ListIterator<Ice.ConnectionI> p = connections.listIterator();
@@ -193,35 +174,42 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
//
+ // Operations from SelectorHandler.
+ //
+
+ public java.nio.channels.SelectableChannel
+ fd()
+ {
+ assert(_acceptor != null);
+ return _acceptor.fd();
+ }
+
+ public boolean
+ hasMoreData()
+ {
+ assert(_acceptor != null);
+ return false;
+ }
+
+ //
// Operations from EventHandler.
//
public boolean
datagram()
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
return _endpoint.datagram();
}
public boolean
readable()
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
return false;
}
public boolean
read(BasicStream unused)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
- assert(false); // Must not be called.
- return false;
- }
-
- public boolean
- hasMoreData()
- {
- assert(!_threadPerConnection); // Only for use with a thread pool.
assert(false); // Must not be called.
return false;
}
@@ -229,8 +217,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
public void
message(BasicStream unused, ThreadPool threadPool)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
Ice.ConnectionI connection = null;
try
@@ -262,7 +248,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
Transceiver transceiver = null;
try
{
- transceiver = _acceptor.accept(0);
+ transceiver = _acceptor.accept();
}
catch(Ice.SocketException ex)
{
@@ -288,8 +274,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
try
{
- assert(!_threadPerConnection);
- connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter, false);
+ connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter);
}
catch(Ice.LocalException ex)
{
@@ -318,7 +303,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
// This makes sure that we promote a follower before we leave the scope of the mutex
// above, but after we call accept() (if we call it).
//
- threadPool.promoteFollower();
+ threadPool.promoteFollower(null);
}
connection.start(this);
@@ -327,19 +312,12 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
public synchronized void
finished(ThreadPool threadPool)
{
- assert(!_threadPerConnection); // Only for use with a thread pool.
-
- threadPool.promoteFollower();
- assert(threadPool == ((Ice.ObjectAdapterI)_adapter).getThreadPool());
+ threadPool.promoteFollower(null);
+ assert(threadPool == ((Ice.ObjectAdapterI)_adapter).getThreadPool() && _state == StateClosed);
- --_finishedCount;
-
- if(_finishedCount == 0 && _state == StateClosed)
- {
- _acceptor.close();
- _acceptor = null;
- notifyAll();
- }
+ _acceptor.close();
+ _acceptor = null;
+ notifyAll();
}
public void
@@ -406,8 +384,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
super(instance);
_endpoint = endpoint;
_adapter = adapter;
- _registeredWithPool = false;
- _finishedCount = 0;
_warn = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
_state = StateHolding;
@@ -423,8 +399,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
Ice.ObjectAdapterI adapterImpl = (Ice.ObjectAdapterI)_adapter;
- _threadPerConnection = adapterImpl.getThreadPerConnection();
-
try
{
EndpointIHolder h = new EndpointIHolder();
@@ -438,8 +412,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
Ice.ConnectionI connection;
try
{
- connection = new Ice.ConnectionI(_instance, _transceiver, _endpoint, _adapter,
- _threadPerConnection);
+ connection = new Ice.ConnectionI(_instance, _transceiver, _endpoint, _adapter);
}
catch(Ice.LocalException ex)
{
@@ -464,25 +437,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
_endpoint = h.value;
assert(_acceptor != null);
_acceptor.listen();
-
- if(_threadPerConnection)
- {
- //
- // If we are in thread per connection mode, we also use
- // one thread per incoming connection factory, that
- // accepts new connections on this endpoint.
- //
- try
- {
- _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory();
- _threadPerIncomingConnectionFactory.start();
- }
- catch(java.lang.Exception ex)
- {
- error("cannot create thread for incoming connection factory", ex);
- throw ex;
- }
- }
}
}
catch(java.lang.Exception ex)
@@ -508,7 +462,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
_state = StateClosed;
_acceptor = null;
_connections = null;
- _threadPerIncomingConnectionFactory = null;
}
if(ex instanceof Ice.LocalException)
@@ -531,7 +484,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
IceUtilInternal.Assert.FinalizerAssert(_state == StateClosed);
IceUtilInternal.Assert.FinalizerAssert(_acceptor == null);
IceUtilInternal.Assert.FinalizerAssert(_connections == null);
- IceUtilInternal.Assert.FinalizerAssert(_threadPerIncomingConnectionFactory == null);
super.finalize();
}
@@ -556,9 +508,9 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
{
return;
}
- if(!_threadPerConnection && _acceptor != null)
+ if(_acceptor != null)
{
- registerWithPool();
+ ((Ice.ObjectAdapterI)_adapter).getThreadPool()._register(this);
}
java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator();
@@ -576,9 +528,9 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
{
return;
}
- if(!_threadPerConnection && _acceptor != null)
+ if(_acceptor != null)
{
- unregisterWithPool();
+ ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(this);
}
java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator();
@@ -594,25 +546,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
{
if(_acceptor != null)
{
- if(_threadPerConnection)
- {
- //
- // If we are in thread per connection mode, we connect
- // to our own acceptor, which unblocks our thread per
- // incoming connection factory stuck in accept().
- //
- _acceptor.connectToSelf();
- }
- else
- {
- //
- // Otherwise we first must make sure that we are
- // registered, then we unregister, and let finished()
- // do the close.
- //
- registerWithPool();
- unregisterWithPool();
- }
+ ((Ice.ObjectAdapterI)_adapter).getThreadPool().finish(this);
}
java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator();
@@ -630,33 +564,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
private void
- registerWithPool()
- {
- assert(!_threadPerConnection); // Only for use with a thread pool.
- assert(_acceptor != null);
-
- if(!_registeredWithPool)
- {
- ((Ice.ObjectAdapterI)_adapter).getThreadPool()._register(_acceptor.fd(), this);
- _registeredWithPool = true;
- }
- }
-
- private void
- unregisterWithPool()
- {
- assert(!_threadPerConnection); // Only for use with a thread pool.
- assert(_acceptor != null);
-
- if(_registeredWithPool)
- {
- ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(_acceptor.fd());
- _registeredWithPool = false;
- ++_finishedCount; // For each unregistration, finished() is called once.
- }
- }
-
- private void
warning(Ice.LocalException ex)
{
java.io.StringWriter sw = new java.io.StringWriter();
@@ -678,168 +585,15 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
_instance.initializationData().logger.error(s);
}
- private void
- run()
- {
- assert(_acceptor != null);
-
- while(true)
- {
- //
- // We must accept new connections outside the thread
- // synchronization, because we use blocking accept.
- //
- Transceiver transceiver = null;
- try
- {
- transceiver = _acceptor.accept(-1);
- }
- catch(Ice.SocketException ex)
- {
- // Ignore socket exceptions.
- }
- catch(Ice.TimeoutException ex)
- {
- // Ignore timeouts.
- }
- catch(Ice.LocalException ex)
- {
- // Warn about other Ice local exceptions.
- if(_warn)
- {
- warning(ex);
- }
- }
-
- Ice.ConnectionI connection = null;
- synchronized(this)
- {
- while(_state == StateHolding)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
- if(_state == StateClosed)
- {
- if(transceiver != null)
- {
- try
- {
- transceiver.close();
- }
- catch(Ice.LocalException ex)
- {
- // Here we ignore any exceptions in close().
- }
- }
-
- try
- {
- _acceptor.close();
- }
- catch(Ice.LocalException ex)
- {
- _acceptor = null;
- notifyAll();
- throw ex;
- }
-
- _acceptor = null;
- notifyAll();
- return;
- }
-
- assert(_state == StateActive);
-
- //
- // Reap connections for which destruction has completed.
- //
- java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator();
- while(p.hasNext())
- {
- Ice.ConnectionI con = p.next();
- if(con.isFinished())
- {
- p.remove();
- }
- }
-
- if(transceiver == null)
- {
- continue;
- }
-
- try
- {
- connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection);
- }
- catch(Ice.LocalException ex)
- {
- try
- {
- transceiver.close();
- }
- catch(Ice.LocalException exc)
- {
- // Ignore
- }
-
- if(_warn)
- {
- warning(ex);
- }
- continue;
- }
- _connections.add(connection);
- }
-
- //
- // In thread-per-connection mode and regardless of the background mode,
- // start() doesn't block. The connection thread is started and takes
- // care of the connection validation and notifies the factory through
- // the callback when it's done.
- //
- connection.start(this);
- }
- }
-
- private class ThreadPerIncomingConnectionFactory extends Thread
- {
- public void
- run()
- {
- try
- {
- IncomingConnectionFactory.this.run();
- }
- catch(Exception ex)
- {
- IncomingConnectionFactory.this.error("exception in thread per incoming connection factory", ex);
- }
- }
- }
- private Thread _threadPerIncomingConnectionFactory;
-
private Acceptor _acceptor;
private final Transceiver _transceiver;
private EndpointI _endpoint;
private Ice.ObjectAdapter _adapter;
- private boolean _registeredWithPool;
- private int _finishedCount;
-
private final boolean _warn;
private java.util.List<Ice.ConnectionI> _connections = new java.util.LinkedList<Ice.ConnectionI>();
private int _state;
-
- private final boolean _threadPerConnection;
}
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index b14f46db598..2a47cba9112 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -217,24 +217,6 @@ public final class Instance
return _timer;
}
- public boolean
- threadPerConnection()
- {
- return _threadPerConnection;
- }
-
- public int
- threadPerConnectionStackSize()
- {
- return _threadPerConnectionStackSize;
- }
-
- public boolean
- background()
- {
- return _background;
- }
-
public synchronized EndpointFactoryManager
endpointFactoryManager()
{
@@ -680,19 +662,6 @@ public final class Instance
_implicitContext = Ice.ImplicitContextI.create(_initData.properties.getProperty("Ice.ImplicitContext"));
- _threadPerConnection = _initData.properties.getPropertyAsInt("Ice.ThreadPerConnection") > 0;
-
- {
- int stackSize = _initData.properties.getPropertyAsInt("Ice.ThreadPerConnection.StackSize");
- if(stackSize < 0)
- {
- stackSize = 0;
- }
- _threadPerConnectionStackSize = stackSize;
- }
-
- _background = _initData.properties.getPropertyAsInt("Ice.Background") > 0;
-
_routerManager = new RouterManager();
_locatorManager = new LocatorManager();
@@ -1082,9 +1051,6 @@ public final class Instance
private SelectorThread _selectorThread;
private EndpointHostResolver _endpointHostResolver;
private Timer _timer;
- private final boolean _threadPerConnection;
- private final int _threadPerConnectionStackSize;
- private final boolean _background;
private EndpointFactoryManager _endpointFactoryManager;
private Ice.PluginManager _pluginManager;
private java.util.Map<String, String> _defaultContext;
diff --git a/java/src/IceInternal/Network.java b/java/src/IceInternal/Network.java
index 6f205bcda0e..1bdd30b54e4 100644
--- a/java/src/IceInternal/Network.java
+++ b/java/src/IceInternal/Network.java
@@ -280,27 +280,13 @@ public final class Network
}
public static boolean
- doConnect(java.nio.channels.SocketChannel fd, java.net.InetSocketAddress addr, int timeout)
+ doConnect(java.nio.channels.SocketChannel fd, java.net.InetSocketAddress addr)
{
try
{
if(!fd.connect(addr))
{
- if(timeout == 0)
- {
- return false;
- }
-
- try
- {
- doFinishConnect(fd, timeout);
- }
- catch(Ice.LocalException ex)
- {
- closeSocketNoThrow(fd);
- throw ex;
- }
- return true;
+ return false;
}
}
catch(java.net.ConnectException ex)
@@ -336,75 +322,13 @@ public final class Network
}
public static void
- doFinishConnect(java.nio.channels.SocketChannel fd, int timeout)
+ doFinishConnect(java.nio.channels.SocketChannel fd)
{
//
// Note: we don't close the socket if there's an exception. It's the responsibility
// of the caller to do so.
//
- if(timeout != 0)
- {
- try
- {
- java.nio.channels.Selector selector = java.nio.channels.Selector.open();
- try
- {
- while(true)
- {
- try
- {
- java.nio.channels.SelectionKey key =
- fd.register(selector, java.nio.channels.SelectionKey.OP_CONNECT);
- int n;
- if(timeout > 0)
- {
- n = selector.select(timeout);
- }
- else
- {
- n = selector.select();
- }
-
- if(n == 0)
- {
- throw new Ice.ConnectTimeoutException();
- }
-
- break;
- }
- catch(java.io.IOException ex)
- {
- if(interrupted(ex))
- {
- continue;
- }
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
- }
- finally
- {
- try
- {
- selector.close();
- }
- catch(java.io.IOException ex)
- {
- // Ignore
- }
- }
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
-
try
{
if(!fd.finishConnect())
@@ -446,7 +370,7 @@ public final class Network
}
public static void
- doConnect(java.nio.channels.DatagramChannel fd, java.net.InetSocketAddress addr, int timeout)
+ doConnect(java.nio.channels.DatagramChannel fd, java.net.InetSocketAddress addr)
{
try
{
@@ -949,58 +873,18 @@ public final class Network
createPipe()
{
SocketPair fds = new SocketPair();
-
- //
- // BUGFIX: This method should really be very simple.
- // Unfortunately, using a pipe causes a kernel crash under
- // MacOS 10.3.9.
- //
- //try
- //{
- // java.nio.channels.Pipe pipe = java.nio.channels.Pipe.open();
- // fds.sink = pipe.sink();
- // fds.source = pipe.source();
- //}
- //catch(java.io.IOException ex)
- //{
- // Ice.SocketException se = new Ice.SocketException();
- // se.initCause(ex);
- // throw se;
- //}
- //
-
- java.nio.channels.ServerSocketChannel fd = createTcpServerSocket();
-
- java.net.InetSocketAddress addr = new java.net.InetSocketAddress("127.0.0.1", 0);
-
- addr = doBind(fd, addr, 0);
-
try
{
- java.nio.channels.SocketChannel sink = createTcpSocket();
- fds.sink = sink;
- doConnect(sink, addr, -1);
- try
- {
- fds.source = doAccept(fd, -1);
- }
- catch(Ice.LocalException ex)
- {
- try
- {
- fds.sink.close();
- }
- catch(java.io.IOException e)
- {
- }
- throw ex;
- }
+ java.nio.channels.Pipe pipe = java.nio.channels.Pipe.open();
+ fds.sink = pipe.sink();
+ fds.source = pipe.source();
}
- finally
+ catch(java.io.IOException ex)
{
- closeSocketNoThrow(fd);
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
}
-
return fds;
}
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java
index a454993f852..1a72c7d8aa4 100644
--- a/java/src/IceInternal/Outgoing.java
+++ b/java/src/IceInternal/Outgoing.java
@@ -258,9 +258,9 @@ public final class Outgoing implements OutgoingMessageCallback
}
public void
- sent(boolean notify)
+ sent(boolean async)
{
- if(notify)
+ if(async)
{
synchronized(this)
{
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index ee90ed1d2a5..9359fe4e36f 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -280,7 +280,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
__send();
}
- public final void
+ public final boolean
__send()
{
try
@@ -288,8 +288,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
_sent = false;
_response = false;
_delegate = _proxy.__getDelegate(true);
- _delegate.__getRequestHandler().sendAsyncRequest(this);
- return;
+ _sentSynchronously = _delegate.__getRequestHandler().sendAsyncRequest(this);
}
catch(LocalExceptionWrapper ex)
{
@@ -299,6 +298,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
{
handleException(ex);
}
+ return _sentSynchronously;
}
protected final void
@@ -470,6 +470,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
}
private boolean _sent;
+ private boolean _sentSynchronously;
private boolean _response;
private Ice.ObjectPrxHelperBase _proxy;
private Ice._ObjectDel _delegate;
diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
index 78b5ddb17c9..a0d21e7ad90 100644
--- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java
+++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
@@ -22,6 +22,19 @@ abstract public class OutgoingAsyncMessageCallback
}
public void
+ __sent(Instance instance)
+ {
+ try
+ {
+ ((Ice.AMISentCallback)this).ice_sent();
+ }
+ catch(java.lang.Exception ex)
+ {
+ __warning(instance, ex);
+ }
+ }
+
+ public void
__exception(Ice.LocalException exc)
{
try
@@ -94,7 +107,7 @@ abstract public class OutgoingAsyncMessageCallback
public void
execute(ThreadPool threadPool)
{
- threadPool.promoteFollower();
+ threadPool.promoteFollower(null);
__exception(ex);
}
});
@@ -125,20 +138,25 @@ abstract public class OutgoingAsyncMessageCallback
protected void
__warning(java.lang.Exception ex)
{
- if(__os != null) // Don't print anything if release() was already called.
+ if(__os != null)
{
- Instance instance = __os.instance();
- if(instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- IceUtilInternal.OutputBase out = new IceUtilInternal.OutputBase(pw);
- out.setUseTab(false);
- out.print("exception raised by AMI callback:\n");
- ex.printStackTrace(pw);
- pw.flush();
- instance.initializationData().logger.warning(sw.toString());
- }
+ __warning(__os.instance(), ex);
+ }
+ }
+
+ protected void
+ __warning(Instance instance, java.lang.Exception ex)
+ {
+ if(instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ IceUtilInternal.OutputBase out = new IceUtilInternal.OutputBase(pw);
+ out.setUseTab(false);
+ out.print("exception raised by AMI callback:\n");
+ ex.printStackTrace(pw);
+ pw.flush();
+ instance.initializationData().logger.warning(sw.toString());
}
}
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index fec7e825ab0..58177812f6d 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -106,8 +106,7 @@ public final class OutgoingConnectionFactory
}
public Ice.ConnectionI
- create(EndpointI[] endpts, boolean hasMore, boolean tpc, Ice.EndpointSelectionType selType,
- Ice.BooleanHolder compress)
+ create(EndpointI[] endpts, boolean hasMore, Ice.EndpointSelectionType selType, Ice.BooleanHolder compress)
{
assert(endpts.length > 0);
@@ -119,7 +118,7 @@ public final class OutgoingConnectionFactory
//
// Try to find a connection to one of the given endpoints.
//
- Ice.ConnectionI connection = findConnection(endpoints, tpc, compress);
+ Ice.ConnectionI connection = findConnectionByEndpoint(endpoints, compress);
if(connection != null)
{
return connection;
@@ -156,7 +155,7 @@ public final class OutgoingConnectionFactory
java.util.Iterator<Connector> q = cons.iterator();
while(q.hasNext())
{
- connectors.add(new ConnectorInfo(q.next(), endpoint, tpc));
+ connectors.add(new ConnectorInfo(q.next(), endpoint));
}
}
catch(Ice.LocalException ex)
@@ -194,21 +193,7 @@ public final class OutgoingConnectionFactory
ConnectorInfo ci = q.next();
try
{
- int timeout;
- if(defaultsAndOverrides.overrideConnectTimeout)
- {
- timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
- }
- else
- {
- //
- // It is not necessary to check for overrideTimeout, the endpoint has already
- // been modified with this override, if set.
- //
- timeout = ci.endpoint.timeout();
- }
-
- connection = createConnection(ci.connector.connect(timeout), ci);
+ connection = createConnection(ci.connector.connect(), ci);
connection.start(null);
if(defaultsAndOverrides.overrideCompress)
@@ -253,7 +238,7 @@ public final class OutgoingConnectionFactory
}
public void
- create(EndpointI[] endpts, boolean hasMore, boolean tpc, Ice.EndpointSelectionType selType,
+ create(EndpointI[] endpts, boolean hasMore, Ice.EndpointSelectionType selType,
CreateConnectionCallback callback)
{
assert(endpts.length > 0);
@@ -269,7 +254,7 @@ public final class OutgoingConnectionFactory
try
{
Ice.BooleanHolder compress = new Ice.BooleanHolder();
- Ice.ConnectionI connection = findConnection(endpoints, tpc, compress);
+ Ice.ConnectionI connection = findConnectionByEndpoint(endpoints, compress);
if(connection != null)
{
callback.setConnection(connection, compress.value);
@@ -282,7 +267,7 @@ public final class OutgoingConnectionFactory
return;
}
- ConnectCallback cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc);
+ ConnectCallback cb = new ConnectCallback(this, endpoints, hasMore, callback, selType);
cb.getConnectors();
}
@@ -469,7 +454,7 @@ public final class OutgoingConnectionFactory
}
synchronized private Ice.ConnectionI
- findConnection(java.util.List<EndpointI> endpoints, boolean tpc, Ice.BooleanHolder compress)
+ findConnectionByEndpoint(java.util.List<EndpointI> endpoints, Ice.BooleanHolder compress)
{
if(_destroyed)
{
@@ -493,8 +478,7 @@ public final class OutgoingConnectionFactory
while(q.hasNext())
{
Ice.ConnectionI connection = q.next();
- if(connection.isActiveOrHolding() &&
- connection.threadPerConnection() == tpc) // Don't return destroyed or un-validated connections
+ if(connection.isActiveOrHolding()) // Don't return destroyed or un-validated connections
{
if(defaultsAndOverrides.overrideCompress)
{
@@ -787,8 +771,7 @@ public final class OutgoingConnectionFactory
throw new Ice.CommunicatorDestroyedException();
}
- Ice.ConnectionI connection = new Ice.ConnectionI(_instance, transceiver, ci.endpoint.compress(false),
- null, ci.threadPerConnection);
+ Ice.ConnectionI connection = new Ice.ConnectionI(_instance, transceiver, ci.endpoint.compress(false),null);
java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci);
if(connectionList == null)
@@ -897,10 +880,7 @@ public final class OutgoingConnectionFactory
// If the connection is finished, we remove it right away instead of
// waiting for the reaping.
//
- // NOTE: it's possible for the connection to not be finished yet. That's
- // for instance the case when using thread per connection and if it's the
- // thread which is calling back the outgoing connection factory to notify
- // it of the failure.
+ // NOTE: it's possible for the connection to not be finished yet.
//
synchronized(this)
{
@@ -957,46 +937,39 @@ public final class OutgoingConnectionFactory
private static class ConnectorInfo
{
- public ConnectorInfo(Connector c, EndpointI e, boolean t)
+ public ConnectorInfo(Connector c, EndpointI e)
{
connector = c;
endpoint = e;
- threadPerConnection = t;
}
public boolean
equals(Object obj)
{
ConnectorInfo r = (ConnectorInfo)obj;
- if(threadPerConnection != r.threadPerConnection)
- {
- return false;
- }
return connector.equals(r.connector);
}
public int
hashCode()
{
- return 2 * connector.hashCode() + (threadPerConnection ? 0 : 1);
+ return connector.hashCode();
}
public Connector connector;
public EndpointI endpoint;
- public boolean threadPerConnection;
}
private static class ConnectCallback implements Ice.ConnectionI.StartCallback, EndpointI_connectors
{
ConnectCallback(OutgoingConnectionFactory f, java.util.List<EndpointI> endpoints, boolean more,
- CreateConnectionCallback cb, Ice.EndpointSelectionType selType, boolean threadPerConnection)
+ CreateConnectionCallback cb, Ice.EndpointSelectionType selType)
{
_factory = f;
_endpoints = endpoints;
_hasMore = more;
_callback = cb;
_selType = selType;
- _threadPerConnection = threadPerConnection;
_endpointsIter = _endpoints.iterator();
}
@@ -1063,7 +1036,7 @@ public final class OutgoingConnectionFactory
java.util.Iterator<Connector> q = cons.iterator();
while(q.hasNext())
{
- _connectors.add(new ConnectorInfo(q.next(), _currentEndpoint, _threadPerConnection));
+ _connectors.add(new ConnectorInfo(q.next(), _currentEndpoint));
}
if(_endpointsIter.hasNext())
@@ -1183,7 +1156,7 @@ public final class OutgoingConnectionFactory
{
assert(_iter.hasNext());
_current = _iter.next();
- connection = _factory.createConnection(_current.connector.connect(0), _current);
+ connection = _factory.createConnection(_current.connector.connect(), _current);
connection.start(this);
}
catch(Ice.LocalException ex)
@@ -1197,7 +1170,6 @@ public final class OutgoingConnectionFactory
private final CreateConnectionCallback _callback;
private final java.util.List<EndpointI> _endpoints;
private final Ice.EndpointSelectionType _selType;
- private final boolean _threadPerConnection;
private java.util.Iterator<EndpointI> _endpointsIter;
private EndpointI _currentEndpoint;
private java.util.List<ConnectorInfo> _connectors = new java.util.ArrayList<ConnectorInfo>();
diff --git a/java/src/IceInternal/PropertyNames.java b/java/src/IceInternal/PropertyNames.java
index ef861c56542..2d9dafb23f5 100644
--- a/java/src/IceInternal/PropertyNames.java
+++ b/java/src/IceInternal/PropertyNames.java
@@ -7,7 +7,7 @@
//
// **********************************************************************
//
-// Generated by makeprops.py from file ../config/PropertyNames.xml, Wed Feb 27 12:48:24 2008
+// Generated by makeprops.py from file ../config/PropertyNames.xml, Mon Mar 3 22:29:56 2008
// IMPORTANT: Do not edit this file -- any edits made here will be lost!
@@ -27,12 +27,11 @@ public final class PropertyNames
new Property("Ice\\.Admin\\.ReplicaGroupId", false, null),
new Property("Ice\\.Admin\\.Router", false, null),
new Property("Ice\\.Admin\\.ProxyOptions", false, null),
- new Property("Ice\\.Admin\\.ThreadPerConnection", false, null),
- new Property("Ice\\.Admin\\.ThreadPerConnection\\.StackSize", false, null),
new Property("Ice\\.Admin\\.ThreadPool\\.Size", false, null),
new Property("Ice\\.Admin\\.ThreadPool\\.SizeMax", false, null),
new Property("Ice\\.Admin\\.ThreadPool\\.SizeWarn", false, null),
new Property("Ice\\.Admin\\.ThreadPool\\.StackSize", false, null),
+ new Property("Ice\\.Admin\\.ThreadPool\\.Serialize", false, null),
new Property("Ice\\.Admin\\.DelayCreation", false, null),
new Property("Ice\\.Admin\\.Facets", false, null),
new Property("Ice\\.Admin\\.InstanceName", false, null),
@@ -53,7 +52,6 @@ public final class PropertyNames
new Property("Ice\\.Default\\.Locator\\.Router", false, null),
new Property("Ice\\.Default\\.Locator\\.CollocationOptimization", true, "Ice.Default.Locator.CollocationOptimized"),
new Property("Ice\\.Default\\.Locator\\.CollocationOptimized", false, null),
- new Property("Ice\\.Default\\.Locator\\.ThreadPerConnection", false, null),
new Property("Ice\\.Default\\.Locator", false, null),
new Property("Ice\\.Default\\.LocatorCacheTimeout", false, null),
new Property("Ice\\.Default\\.Package", false, null),
@@ -67,7 +65,6 @@ public final class PropertyNames
new Property("Ice\\.Default\\.Router\\.Router", false, null),
new Property("Ice\\.Default\\.Router\\.CollocationOptimization", true, "Ice.Default.Router.CollocationOptimized"),
new Property("Ice\\.Default\\.Router\\.CollocationOptimized", false, null),
- new Property("Ice\\.Default\\.Router\\.ThreadPerConnection", false, null),
new Property("Ice\\.Default\\.Router", false, null),
new Property("Ice\\.IPv4", false, null),
new Property("Ice\\.IPv6", false, null),
@@ -95,16 +92,16 @@ public final class PropertyNames
new Property("Ice\\.ServerIdleTime", false, null),
new Property("Ice\\.StdErr", false, null),
new Property("Ice\\.StdOut", false, null),
- new Property("Ice\\.ThreadPerConnection", false, null),
- new Property("Ice\\.ThreadPerConnection\\.StackSize", false, null),
new Property("Ice\\.ThreadPool\\.Client\\.Size", false, null),
new Property("Ice\\.ThreadPool\\.Client\\.SizeMax", false, null),
new Property("Ice\\.ThreadPool\\.Client\\.SizeWarn", false, null),
new Property("Ice\\.ThreadPool\\.Client\\.StackSize", false, null),
+ new Property("Ice\\.ThreadPool\\.Client\\.Serialize", false, null),
new Property("Ice\\.ThreadPool\\.Server\\.Size", false, null),
new Property("Ice\\.ThreadPool\\.Server\\.SizeMax", false, null),
new Property("Ice\\.ThreadPool\\.Server\\.SizeWarn", false, null),
new Property("Ice\\.ThreadPool\\.Server\\.StackSize", false, null),
+ new Property("Ice\\.ThreadPool\\.Server\\.Serialize", false, null),
new Property("Ice\\.Trace\\.GC", false, null),
new Property("Ice\\.Trace\\.Location", true, "Ice.Trace.Locator"),
new Property("Ice\\.Trace\\.Locator", false, null),
@@ -144,12 +141,11 @@ public final class PropertyNames
new Property("IceBox\\.ServiceManager\\.ReplicaGroupId", false, null),
new Property("IceBox\\.ServiceManager\\.Router", false, null),
new Property("IceBox\\.ServiceManager\\.ProxyOptions", false, null),
- new Property("IceBox\\.ServiceManager\\.ThreadPerConnection", false, null),
- new Property("IceBox\\.ServiceManager\\.ThreadPerConnection\\.StackSize", false, null),
new Property("IceBox\\.ServiceManager\\.ThreadPool\\.Size", false, null),
new Property("IceBox\\.ServiceManager\\.ThreadPool\\.SizeMax", false, null),
new Property("IceBox\\.ServiceManager\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceBox\\.ServiceManager\\.ThreadPool\\.StackSize", false, null),
+ new Property("IceBox\\.ServiceManager\\.ThreadPool\\.Serialize", false, null),
new Property("IceBox\\.Trace\\.ServiceObserver", false, null),
new Property("IceBox\\.UseSharedCommunicator\\.[^\\s]+", false, null),
null
@@ -165,7 +161,6 @@ public final class PropertyNames
new Property("IceBoxAdmin\\.ServiceManager\\.Proxy\\.Router", false, null),
new Property("IceBoxAdmin\\.ServiceManager\\.Proxy\\.CollocationOptimization", true, "IceBoxAdmin.ServiceManager.Proxy.CollocationOptimized"),
new Property("IceBoxAdmin\\.ServiceManager\\.Proxy\\.CollocationOptimized", false, null),
- new Property("IceBoxAdmin\\.ServiceManager\\.Proxy\\.ThreadPerConnection", false, null),
new Property("IceBoxAdmin\\.ServiceManager\\.Proxy", false, null),
null
};
@@ -192,12 +187,11 @@ public final class PropertyNames
new Property("IceGrid\\.Node\\.ReplicaGroupId", false, null),
new Property("IceGrid\\.Node\\.Router", false, null),
new Property("IceGrid\\.Node\\.ProxyOptions", false, null),
- new Property("IceGrid\\.Node\\.ThreadPerConnection", false, null),
- new Property("IceGrid\\.Node\\.ThreadPerConnection\\.StackSize", false, null),
new Property("IceGrid\\.Node\\.ThreadPool\\.Size", false, null),
new Property("IceGrid\\.Node\\.ThreadPool\\.SizeMax", false, null),
new Property("IceGrid\\.Node\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Node\\.ThreadPool\\.StackSize", false, null),
+ new Property("IceGrid\\.Node\\.ThreadPool\\.Serialize", false, null),
new Property("IceGrid\\.Node\\.AllowRunningServersAsRoot", false, null),
new Property("IceGrid\\.Node\\.AllowEndpointsOverride", false, null),
new Property("IceGrid\\.Node\\.CollocateRegistry", false, null),
@@ -222,7 +216,6 @@ public final class PropertyNames
new Property("IceGrid\\.Node\\.UserAccountMapper\\.Router", false, null),
new Property("IceGrid\\.Node\\.UserAccountMapper\\.CollocationOptimization", true, "IceGrid.Node.UserAccountMapper.CollocationOptimized"),
new Property("IceGrid\\.Node\\.UserAccountMapper\\.CollocationOptimized", false, null),
- new Property("IceGrid\\.Node\\.UserAccountMapper\\.ThreadPerConnection", false, null),
new Property("IceGrid\\.Node\\.UserAccountMapper", false, null),
new Property("IceGrid\\.Node\\.WaitTime", false, null),
new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.AdapterId", false, null),
@@ -233,12 +226,11 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ReplicaGroupId", false, null),
new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.Router", false, null),
new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ProxyOptions", false, null),
- new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPerConnection", false, null),
- new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPerConnection\\.StackSize", false, null),
new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPool\\.Size", false, null),
new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPool\\.SizeMax", false, null),
new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPool\\.StackSize", false, null),
+ new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPool\\.Serialize", false, null),
new Property("IceGrid\\.Registry\\.AdminCryptPasswords", false, null),
new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.EndpointSelection", false, null),
new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.ConnectionCached", false, null),
@@ -248,7 +240,6 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.Router", false, null),
new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.CollocationOptimization", true, "IceGrid.Registry.AdminPermissionsVerifier.CollocationOptimized"),
new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.CollocationOptimized", false, null),
- new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.ThreadPerConnection", false, null),
new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier", false, null),
new Property("IceGrid\\.Registry\\.AdminSessionFilters", false, null),
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.AdapterId", false, null),
@@ -259,12 +250,11 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ReplicaGroupId", false, null),
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.Router", false, null),
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ProxyOptions", false, null),
- new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPerConnection", false, null),
- new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPerConnection\\.StackSize", false, null),
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.Size", false, null),
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.SizeMax", false, null),
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.StackSize", false, null),
+ new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.Serialize", false, null),
new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.EndpointSelection", false, null),
new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.ConnectionCached", false, null),
new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.PreferSecure", false, null),
@@ -273,7 +263,6 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.Router", false, null),
new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.CollocationOptimization", true, "IceGrid.Registry.AdminSSLPermissionsVerifier.CollocationOptimized"),
new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.CollocationOptimized", false, null),
- new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.ThreadPerConnection", false, null),
new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier", false, null),
new Property("IceGrid\\.Registry\\.Client\\.AdapterId", false, null),
new Property("IceGrid\\.Registry\\.Client\\.Endpoints", false, null),
@@ -283,12 +272,11 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.Client\\.ReplicaGroupId", false, null),
new Property("IceGrid\\.Registry\\.Client\\.Router", false, null),
new Property("IceGrid\\.Registry\\.Client\\.ProxyOptions", false, null),
- new Property("IceGrid\\.Registry\\.Client\\.ThreadPerConnection", false, null),
- new Property("IceGrid\\.Registry\\.Client\\.ThreadPerConnection\\.StackSize", false, null),
new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.Size", false, null),
new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.SizeMax", false, null),
new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.StackSize", false, null),
+ new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.Serialize", false, null),
new Property("IceGrid\\.Registry\\.CryptPasswords", false, null),
new Property("IceGrid\\.Registry\\.Data", false, null),
new Property("IceGrid\\.Registry\\.DefaultTemplates", false, null),
@@ -301,12 +289,11 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.Internal\\.ReplicaGroupId", false, null),
new Property("IceGrid\\.Registry\\.Internal\\.Router", false, null),
new Property("IceGrid\\.Registry\\.Internal\\.ProxyOptions", false, null),
- new Property("IceGrid\\.Registry\\.Internal\\.ThreadPerConnection", false, null),
- new Property("IceGrid\\.Registry\\.Internal\\.ThreadPerConnection\\.StackSize", false, null),
new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.Size", false, null),
new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.SizeMax", false, null),
new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.StackSize", false, null),
+ new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.Serialize", false, null),
new Property("IceGrid\\.Registry\\.NodeSessionTimeout", false, null),
new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.EndpointSelection", false, null),
new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.ConnectionCached", false, null),
@@ -316,7 +303,6 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.Router", false, null),
new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.CollocationOptimization", true, "IceGrid.Registry.PermissionsVerifier.CollocationOptimized"),
new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.CollocationOptimized", false, null),
- new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.ThreadPerConnection", false, null),
new Property("IceGrid\\.Registry\\.PermissionsVerifier", false, null),
new Property("IceGrid\\.Registry\\.ReplicaName", false, null),
new Property("IceGrid\\.Registry\\.ReplicaSessionTimeout", false, null),
@@ -328,12 +314,11 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.Server\\.ReplicaGroupId", false, null),
new Property("IceGrid\\.Registry\\.Server\\.Router", false, null),
new Property("IceGrid\\.Registry\\.Server\\.ProxyOptions", false, null),
- new Property("IceGrid\\.Registry\\.Server\\.ThreadPerConnection", false, null),
- new Property("IceGrid\\.Registry\\.Server\\.ThreadPerConnection\\.StackSize", false, null),
new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.Size", false, null),
new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.SizeMax", false, null),
new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.StackSize", false, null),
+ new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.Serialize", false, null),
new Property("IceGrid\\.Registry\\.SessionFilters", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.AdapterId", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.Endpoints", false, null),
@@ -343,12 +328,11 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.SessionManager\\.ReplicaGroupId", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.Router", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.ProxyOptions", false, null),
- new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPerConnection", false, null),
- new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPerConnection\\.StackSize", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.Size", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.SizeMax", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.StackSize", false, null),
+ new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.Serialize", false, null),
new Property("IceGrid\\.Registry\\.SessionTimeout", false, null),
new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.EndpointSelection", false, null),
new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.ConnectionCached", false, null),
@@ -358,7 +342,6 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.Router", false, null),
new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.CollocationOptimization", true, "IceGrid.Registry.SSLPermissionsVerifier.CollocationOptimized"),
new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.CollocationOptimized", false, null),
- new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.ThreadPerConnection", false, null),
new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier", false, null),
new Property("IceGrid\\.Registry\\.Trace\\.Application", false, null),
new Property("IceGrid\\.Registry\\.Trace\\.Adapter", false, null),
@@ -383,12 +366,11 @@ public final class PropertyNames
new Property("IcePatch2\\.ReplicaGroupId", false, null),
new Property("IcePatch2\\.Router", false, null),
new Property("IcePatch2\\.ProxyOptions", false, null),
- new Property("IcePatch2\\.ThreadPerConnection", false, null),
- new Property("IcePatch2\\.ThreadPerConnection\\.StackSize", false, null),
new Property("IcePatch2\\.ThreadPool\\.Size", false, null),
new Property("IcePatch2\\.ThreadPool\\.SizeMax", false, null),
new Property("IcePatch2\\.ThreadPool\\.SizeWarn", false, null),
new Property("IcePatch2\\.ThreadPool\\.StackSize", false, null),
+ new Property("IcePatch2\\.ThreadPool\\.Serialize", false, null),
new Property("IcePatch2\\.Admin\\.AdapterId", true, null),
new Property("IcePatch2\\.Admin\\.Endpoints", true, null),
new Property("IcePatch2\\.Admin\\.Locator", true, null),
@@ -396,8 +378,6 @@ public final class PropertyNames
new Property("IcePatch2\\.Admin\\.RegisterProcess", true, null),
new Property("IcePatch2\\.Admin\\.ReplicaGroupId", true, null),
new Property("IcePatch2\\.Admin\\.Router", true, null),
- new Property("IcePatch2\\.Admin\\.ThreadPerConnection", true, null),
- new Property("IcePatch2\\.Admin\\.ThreadPerConnection\\.StackSize", true, null),
new Property("IcePatch2\\.Admin\\.ThreadPool\\.Size", true, null),
new Property("IcePatch2\\.Admin\\.ThreadPool\\.SizeMax", true, null),
new Property("IcePatch2\\.Admin\\.ThreadPool\\.SizeWarn", true, null),
@@ -453,62 +433,6 @@ public final class PropertyNames
null
};
- public static final Property IceStormProps[] =
- {
- new Property("IceStorm\\.Flush\\.Timeout", false, null),
- new Property("IceStorm\\.InstanceName", false, null),
- new Property("IceStorm\\.Publish\\.AdapterId", false, null),
- new Property("IceStorm\\.Publish\\.Endpoints", false, null),
- new Property("IceStorm\\.Publish\\.Locator", false, null),
- new Property("IceStorm\\.Publish\\.PublishedEndpoints", false, null),
- new Property("IceStorm\\.Publish\\.RegisterProcess", true, null),
- new Property("IceStorm\\.Publish\\.ReplicaGroupId", false, null),
- new Property("IceStorm\\.Publish\\.Router", false, null),
- new Property("IceStorm\\.Publish\\.ProxyOptions", false, null),
- new Property("IceStorm\\.Publish\\.ThreadPerConnection", false, null),
- new Property("IceStorm\\.Publish\\.ThreadPerConnection\\.StackSize", false, null),
- new Property("IceStorm\\.Publish\\.ThreadPool\\.Size", false, null),
- new Property("IceStorm\\.Publish\\.ThreadPool\\.SizeMax", false, null),
- new Property("IceStorm\\.Publish\\.ThreadPool\\.SizeWarn", false, null),
- new Property("IceStorm\\.Publish\\.ThreadPool\\.StackSize", false, null),
- new Property("IceStorm\\.TopicManager\\.AdapterId", false, null),
- new Property("IceStorm\\.TopicManager\\.Endpoints", false, null),
- new Property("IceStorm\\.TopicManager\\.Locator", false, null),
- new Property("IceStorm\\.TopicManager\\.PublishedEndpoints", false, null),
- new Property("IceStorm\\.TopicManager\\.RegisterProcess", true, null),
- new Property("IceStorm\\.TopicManager\\.ReplicaGroupId", false, null),
- new Property("IceStorm\\.TopicManager\\.Router", false, null),
- new Property("IceStorm\\.TopicManager\\.ProxyOptions", false, null),
- new Property("IceStorm\\.TopicManager\\.ThreadPerConnection", false, null),
- new Property("IceStorm\\.TopicManager\\.ThreadPerConnection\\.StackSize", false, null),
- new Property("IceStorm\\.TopicManager\\.ThreadPool\\.Size", false, null),
- new Property("IceStorm\\.TopicManager\\.ThreadPool\\.SizeMax", false, null),
- new Property("IceStorm\\.TopicManager\\.ThreadPool\\.SizeWarn", false, null),
- new Property("IceStorm\\.TopicManager\\.ThreadPool\\.StackSize", false, null),
- new Property("IceStorm\\.TopicManager\\.Proxy\\.EndpointSelection", false, null),
- new Property("IceStorm\\.TopicManager\\.Proxy\\.ConnectionCached", false, null),
- new Property("IceStorm\\.TopicManager\\.Proxy\\.PreferSecure", false, null),
- new Property("IceStorm\\.TopicManager\\.Proxy\\.LocatorCacheTimeout", false, null),
- new Property("IceStorm\\.TopicManager\\.Proxy\\.Locator", false, null),
- new Property("IceStorm\\.TopicManager\\.Proxy\\.Router", false, null),
- new Property("IceStorm\\.TopicManager\\.Proxy\\.CollocationOptimization", true, "IceStorm.TopicManager.Proxy.CollocationOptimized"),
- new Property("IceStorm\\.TopicManager\\.Proxy\\.CollocationOptimized", false, null),
- new Property("IceStorm\\.TopicManager\\.Proxy\\.ThreadPerConnection", false, null),
- new Property("IceStorm\\.TopicManager\\.Proxy", false, null),
- new Property("IceStorm\\.SubscriberPool\\.Size", false, null),
- new Property("IceStorm\\.SubscriberPool\\.SizeMax", false, null),
- new Property("IceStorm\\.SubscriberPool\\.SizeWarn", false, null),
- new Property("IceStorm\\.SubscriberPool\\.StackSize", false, null),
- new Property("IceStorm\\.Trace\\.Flush", false, null),
- new Property("IceStorm\\.Trace\\.Subscriber", false, null),
- new Property("IceStorm\\.Trace\\.SubscriberPool", false, null),
- new Property("IceStorm\\.Trace\\.Topic", false, null),
- new Property("IceStorm\\.Trace\\.TopicManager", false, null),
- new Property("IceStorm\\.Send\\.Timeout", false, null),
- new Property("IceStorm\\.Discard\\.Interval", false, null),
- null
- };
-
public static final Property Glacier2Props[] =
{
new Property("Glacier2\\.AddSSLContext", false, null),
@@ -520,8 +444,6 @@ public final class PropertyNames
new Property("Glacier2\\.Admin\\.RegisterProcess", true, null),
new Property("Glacier2\\.Admin\\.ReplicaGroupId", true, null),
new Property("Glacier2\\.Admin\\.Router", true, null),
- new Property("Glacier2\\.Admin\\.ThreadPerConnection", true, null),
- new Property("Glacier2\\.Admin\\.ThreadPerConnection\\.StackSize", true, null),
new Property("Glacier2\\.Admin\\.ThreadPool\\.Size", true, null),
new Property("Glacier2\\.Admin\\.ThreadPool\\.SizeMax", true, null),
new Property("Glacier2\\.Admin\\.ThreadPool\\.SizeWarn", true, null),
@@ -535,12 +457,11 @@ public final class PropertyNames
new Property("Glacier2\\.Client\\.ReplicaGroupId", false, null),
new Property("Glacier2\\.Client\\.Router", false, null),
new Property("Glacier2\\.Client\\.ProxyOptions", false, null),
- new Property("Glacier2\\.Client\\.ThreadPerConnection", false, null),
- new Property("Glacier2\\.Client\\.ThreadPerConnection\\.StackSize", false, null),
new Property("Glacier2\\.Client\\.ThreadPool\\.Size", false, null),
new Property("Glacier2\\.Client\\.ThreadPool\\.SizeMax", false, null),
new Property("Glacier2\\.Client\\.ThreadPool\\.SizeWarn", false, null),
new Property("Glacier2\\.Client\\.ThreadPool\\.StackSize", false, null),
+ new Property("Glacier2\\.Client\\.ThreadPool\\.Serialize", false, null),
new Property("Glacier2\\.Client\\.AlwaysBatch", false, null),
new Property("Glacier2\\.Client\\.Buffered", false, null),
new Property("Glacier2\\.Client\\.ForwardContext", false, null),
@@ -565,7 +486,6 @@ public final class PropertyNames
new Property("Glacier2\\.PermissionsVerifier\\.Router", false, null),
new Property("Glacier2\\.PermissionsVerifier\\.CollocationOptimization", true, "Glacier2.PermissionsVerifier.CollocationOptimized"),
new Property("Glacier2\\.PermissionsVerifier\\.CollocationOptimized", false, null),
- new Property("Glacier2\\.PermissionsVerifier\\.ThreadPerConnection", false, null),
new Property("Glacier2\\.PermissionsVerifier", false, null),
new Property("Glacier2\\.ReturnClientProxy", false, null),
new Property("Glacier2\\.SSLPermissionsVerifier\\.EndpointSelection", false, null),
@@ -576,7 +496,6 @@ public final class PropertyNames
new Property("Glacier2\\.SSLPermissionsVerifier\\.Router", false, null),
new Property("Glacier2\\.SSLPermissionsVerifier\\.CollocationOptimization", true, "Glacier2.SSLPermissionsVerifier.CollocationOptimized"),
new Property("Glacier2\\.SSLPermissionsVerifier\\.CollocationOptimized", false, null),
- new Property("Glacier2\\.SSLPermissionsVerifier\\.ThreadPerConnection", false, null),
new Property("Glacier2\\.SSLPermissionsVerifier", false, null),
new Property("Glacier2\\.RoutingTable\\.MaxSize", false, null),
new Property("Glacier2\\.Server\\.AdapterId", false, null),
@@ -587,12 +506,11 @@ public final class PropertyNames
new Property("Glacier2\\.Server\\.ReplicaGroupId", false, null),
new Property("Glacier2\\.Server\\.Router", false, null),
new Property("Glacier2\\.Server\\.ProxyOptions", false, null),
- new Property("Glacier2\\.Server\\.ThreadPerConnection", false, null),
- new Property("Glacier2\\.Server\\.ThreadPerConnection\\.StackSize", false, null),
new Property("Glacier2\\.Server\\.ThreadPool\\.Size", false, null),
new Property("Glacier2\\.Server\\.ThreadPool\\.SizeMax", false, null),
new Property("Glacier2\\.Server\\.ThreadPool\\.SizeWarn", false, null),
new Property("Glacier2\\.Server\\.ThreadPool\\.StackSize", false, null),
+ new Property("Glacier2\\.Server\\.ThreadPool\\.Serialize", false, null),
new Property("Glacier2\\.Server\\.AlwaysBatch", false, null),
new Property("Glacier2\\.Server\\.Buffered", false, null),
new Property("Glacier2\\.Server\\.ForwardContext", false, null),
@@ -607,7 +525,6 @@ public final class PropertyNames
new Property("Glacier2\\.SessionManager\\.Router", false, null),
new Property("Glacier2\\.SessionManager\\.CollocationOptimization", true, "Glacier2.SessionManager.CollocationOptimized"),
new Property("Glacier2\\.SessionManager\\.CollocationOptimized", false, null),
- new Property("Glacier2\\.SessionManager\\.ThreadPerConnection", false, null),
new Property("Glacier2\\.SessionManager", false, null),
new Property("Glacier2\\.SSLSessionManager\\.EndpointSelection", false, null),
new Property("Glacier2\\.SSLSessionManager\\.ConnectionCached", false, null),
@@ -617,7 +534,6 @@ public final class PropertyNames
new Property("Glacier2\\.SSLSessionManager\\.Router", false, null),
new Property("Glacier2\\.SSLSessionManager\\.CollocationOptimization", true, "Glacier2.SSLSessionManager.CollocationOptimized"),
new Property("Glacier2\\.SSLSessionManager\\.CollocationOptimized", false, null),
- new Property("Glacier2\\.SSLSessionManager\\.ThreadPerConnection", false, null),
new Property("Glacier2\\.SSLSessionManager", false, null),
new Property("Glacier2\\.SessionTimeout", false, null),
new Property("Glacier2\\.Trace\\.RoutingTable", false, null),
@@ -667,7 +583,6 @@ public final class PropertyNames
IcePatch2Props,
IceSSLProps,
IceStormAdminProps,
- IceStormProps,
Glacier2Props,
FreezeProps,
null
@@ -683,7 +598,6 @@ public final class PropertyNames
"IcePatch2",
"IceSSL",
"IceStormAdmin",
- "IceStorm",
"Glacier2",
"Freeze",
null
diff --git a/java/src/IceInternal/Reference.java b/java/src/IceInternal/Reference.java
index a05becb3c56..5fdb2696e70 100644
--- a/java/src/IceInternal/Reference.java
+++ b/java/src/IceInternal/Reference.java
@@ -83,7 +83,6 @@ public abstract class Reference implements Cloneable
public abstract boolean getCacheConnection();
public abstract boolean getPreferSecure();
public abstract Ice.EndpointSelectionType getEndpointSelection();
- public abstract boolean getThreadPerConnection();
public abstract int getLocatorCacheTimeout();
//
@@ -179,7 +178,6 @@ public abstract class Reference implements Cloneable
public abstract Reference changeCacheConnection(boolean newCache);
public abstract Reference changePreferSecure(boolean newPreferSecure);
public abstract Reference changeEndpointSelection(Ice.EndpointSelectionType newType);
- public abstract Reference changeThreadPerConnection(boolean newTpc);
public abstract Reference changeLocatorCacheTimeout(int newTimeout);
public abstract Reference changeTimeout(int newTimeout);
diff --git a/java/src/IceInternal/ReferenceFactory.java b/java/src/IceInternal/ReferenceFactory.java
index 2e5ed274c90..23e5094dcd0 100644
--- a/java/src/IceInternal/ReferenceFactory.java
+++ b/java/src/IceInternal/ReferenceFactory.java
@@ -647,8 +647,7 @@ public final class ReferenceFactory
"LocatorCacheTimeout",
"Locator",
"Router",
- "CollocationOptimized",
- "ThreadPerConnection"
+ "CollocationOptimized"
};
private void
@@ -717,7 +716,6 @@ public final class ReferenceFactory
boolean cacheConnection = true;
boolean preferSecure = defaultsAndOverrides.defaultPreferSecure;
Ice.EndpointSelectionType endpointSelection = defaultsAndOverrides.defaultEndpointSelection;
- boolean threadPerConnection = _instance.threadPerConnection();
int locatorCacheTimeout = defaultsAndOverrides.defaultLocatorCacheTimeout;
//
@@ -787,9 +785,6 @@ public final class ReferenceFactory
}
}
- property = propertyPrefix + ".ThreadPerConnection";
- threadPerConnection = properties.getPropertyAsIntWithDefault(property, threadPerConnection ? 1 : 0) > 0;
-
property = propertyPrefix + ".LocatorCacheTimeout";
locatorCacheTimeout = properties.getPropertyAsIntWithDefault(property, locatorCacheTimeout);
}
@@ -812,7 +807,6 @@ public final class ReferenceFactory
cacheConnection,
preferSecure,
endpointSelection,
- threadPerConnection,
locatorCacheTimeout));
}
diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java
index 2daa83c2325..f45cfe6e9dd 100644
--- a/java/src/IceInternal/RequestHandler.java
+++ b/java/src/IceInternal/RequestHandler.java
@@ -18,11 +18,11 @@ public interface RequestHandler
Ice.ConnectionI sendRequest(Outgoing out)
throws LocalExceptionWrapper;
- void sendAsyncRequest(OutgoingAsync out)
+ boolean sendAsyncRequest(OutgoingAsync out)
throws LocalExceptionWrapper;
boolean flushBatchRequests(BatchOutgoing out);
- void flushAsyncBatchRequests(BatchOutgoingAsync out);
+ boolean flushAsyncBatchRequests(BatchOutgoingAsync out);
Reference getReference();
diff --git a/java/src/IceInternal/RoutableReference.java b/java/src/IceInternal/RoutableReference.java
index 0b89530575f..a16fffe72c6 100644
--- a/java/src/IceInternal/RoutableReference.java
+++ b/java/src/IceInternal/RoutableReference.java
@@ -59,12 +59,6 @@ public class RoutableReference extends Reference
return _endpointSelection;
}
- public final boolean
- getThreadPerConnection()
- {
- return _threadPerConnection;
- }
-
public final int
getLocatorCacheTimeout()
{
@@ -189,18 +183,6 @@ public class RoutableReference extends Reference
}
public Reference
- changeThreadPerConnection(boolean newTpc)
- {
- if(newTpc == _threadPerConnection)
- {
- return this;
- }
- RoutableReference r = (RoutableReference)getInstance().referenceFactory().copy(this);
- r._threadPerConnection = newTpc;
- return r;
- }
-
- public Reference
changeLocatorCacheTimeout(int newTimeout)
{
if(_locatorCacheTimeout == newTimeout)
@@ -390,10 +372,6 @@ public class RoutableReference extends Reference
{
return false;
}
- if(_threadPerConnection != rhs._threadPerConnection)
- {
- return false;
- }
if(_locatorCacheTimeout != rhs._locatorCacheTimeout)
{
return false;
@@ -618,7 +596,6 @@ public class RoutableReference extends Reference
boolean cacheConnection,
boolean prefereSecure,
Ice.EndpointSelectionType endpointSelection,
- boolean threadPerConnection,
int locatorCacheTimeout)
{
super(instance, communicator, identity, context, facet, mode, secure);
@@ -630,7 +607,6 @@ public class RoutableReference extends Reference
_cacheConnection = cacheConnection;
_preferSecure = prefereSecure;
_endpointSelection = endpointSelection;
- _threadPerConnection = threadPerConnection;
_locatorCacheTimeout = locatorCacheTimeout;
_overrideTimeout = false;
_timeout = -1;
@@ -798,7 +774,7 @@ public class RoutableReference extends Reference
// Get an existing connection or create one if there's no
// existing connection to one of the given endpoints.
//
- connection = factory.create(endpoints, false, _threadPerConnection, getEndpointSelection(), compress);
+ connection = factory.create(endpoints, false, getEndpointSelection(), compress);
}
else
{
@@ -818,8 +794,7 @@ public class RoutableReference extends Reference
{
endpoint[0] = endpoints[i];
final boolean more = i != endpoints.length - 1;
- connection = factory.create(endpoint, more, _threadPerConnection, getEndpointSelection(),
- compress);
+ connection = factory.create(endpoint, more, getEndpointSelection(), compress);
break;
}
catch(Ice.LocalException ex)
@@ -870,7 +845,7 @@ public class RoutableReference extends Reference
// Get an existing connection or create one if there's no
// existing connection to one of the given endpoints.
//
- factory.create(endpoints, false, _threadPerConnection, getEndpointSelection(),
+ factory.create(endpoints, false, getEndpointSelection(),
new OutgoingConnectionFactory.CreateConnectionCallback()
{
public void
@@ -905,7 +880,7 @@ public class RoutableReference extends Reference
// connection for one of the endpoints.
//
- factory.create(new EndpointI[]{ endpoints[0] }, true, _threadPerConnection, getEndpointSelection(),
+ factory.create(new EndpointI[]{ endpoints[0] }, true, getEndpointSelection(),
new OutgoingConnectionFactory.CreateConnectionCallback()
{
public void
@@ -939,7 +914,7 @@ public class RoutableReference extends Reference
final boolean more = _i != endpoints.length - 1;
final EndpointI[] endpoint = new EndpointI[]{ endpoints[_i] };
- factory.create(endpoint, more, _threadPerConnection, getEndpointSelection(), this);
+ factory.create(endpoint, more, getEndpointSelection(), this);
}
private int _i = 0;
@@ -1003,7 +978,6 @@ public class RoutableReference extends Reference
private boolean _cacheConnection;
private boolean _preferSecure;
private Ice.EndpointSelectionType _endpointSelection;
- private boolean _threadPerConnection;
private int _locatorCacheTimeout;
private boolean _overrideTimeout;
diff --git a/java/src/IceInternal/Selector.java b/java/src/IceInternal/Selector.java
new file mode 100644
index 00000000000..32b7f36d641
--- /dev/null
+++ b/java/src/IceInternal/Selector.java
@@ -0,0 +1,479 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public final class Selector
+{
+
+ public
+ Selector(Instance instance, int timeout)
+ {
+ _instance = instance;
+ _timeout = timeout;
+ _interruptCount = 0;
+
+ Network.SocketPair pair = Network.createPipe();
+ _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
+ _fdIntrWrite = pair.sink;
+ try
+ {
+ _selector = java.nio.channels.Selector.open();
+ pair.source.configureBlocking(false);
+ _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ);
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SyscallException sys = new Ice.SyscallException();
+ sys.initCause(ex);
+ throw sys;
+ }
+
+ //
+ // The Selector holds a Set representing the selected keys. The
+ // Set reference doesn't change, so we obtain it once here.
+ //
+ _keys = _selector.selectedKeys();
+ }
+
+ public void
+ destroy()
+ {
+ try
+ {
+ _selector.close();
+ }
+ catch(java.io.IOException ex)
+ {
+ }
+ _selector = null;
+
+ try
+ {
+ _fdIntrWrite.close();
+ }
+ catch(java.io.IOException ex)
+ {
+ }
+ _fdIntrWrite = null;
+
+ try
+ {
+ _fdIntrRead.close();
+ }
+ catch(java.io.IOException ex)
+ {
+ }
+ _fdIntrRead = null;
+ }
+
+ public void
+ add(SelectorHandler handler, SocketStatus status)
+ {
+ // Note: we can't support noInterrupt for add() because a channel can't be registered again
+ // with the selector until the previous selection key has been removed from the cancel-key
+ // set of the selector on the next select() operation.
+
+ handler._pendingStatus = status;
+ if(_changes.add(handler))
+ {
+ setInterrupt();
+ }
+ }
+
+ public void
+ update(SelectorHandler handler, SocketStatus newStatus)
+ {
+ // Note: can only be called from the select() thread
+ assert(handler._key != null);
+ handler._key.interestOps(convertStatus(handler.fd(), newStatus));
+ }
+
+ public void
+ remove(SelectorHandler handler)
+ {
+ // Note: we can't support noInterrupt for remove() because a channel can't be registered again
+ // with the selector until the previous selection key has been removed from the cancel-key
+ // set of the selector on the next select() operation.
+
+ handler._pendingStatus = IceInternal.SocketStatus.Finished;
+ if(_changes.add(handler))
+ {
+ setInterrupt();
+ }
+ }
+
+ public void
+ select()
+ throws java.io.IOException
+ {
+ //
+ // If there are still interrupts, selected keys or pending handlers to process,
+ // return immediately.
+ //
+ if(_interrupted || !_keys.isEmpty() || !_pendingHandlers.isEmpty())
+ {
+ return;
+ }
+
+ //
+ // There's nothing left to process, we can now select.
+ //
+ while(true)
+ {
+ try
+ {
+ if(_nextPendingHandlers.isEmpty())
+ {
+ if(_timeout > 0)
+ {
+ _selector.select(_timeout * 1000);
+ }
+ else
+ {
+ _selector.select();
+ }
+ }
+ else
+ {
+ _selector.selectNow();
+
+ java.util.HashSet<SelectorHandler> tmp = _nextPendingHandlers;
+ _nextPendingHandlers = _pendingHandlers;
+ _pendingHandlers = tmp;
+ }
+ }
+ catch(java.nio.channels.CancelledKeyException ex)
+ {
+ // This sometime occurs on Mac OS X, ignore.
+ continue;
+ }
+ catch(java.io.IOException ex)
+ {
+ //
+ // Pressing Ctrl-C causes select() to raise an
+ // IOException, which seems like a JDK bug. We trap
+ // for that special case here and ignore it.
+ // Hopefully we're not masking something important!
+ //
+ if(Network.interrupted(ex))
+ {
+ continue;
+ }
+
+ throw ex;
+ }
+
+ break;
+ }
+ }
+
+ public SelectorHandler
+ getNextSelected()
+ {
+ assert(_interruptCount == 0);
+
+ if(_iter == null && !_keys.isEmpty())
+ {
+ _iter = _keys.iterator();
+ }
+
+ while(_iter != null && _iter.hasNext())
+ {
+ java.nio.channels.SelectionKey key = _iter.next();
+ _iter.remove();
+ SelectorHandler handler = (SelectorHandler)key.attachment();
+ if(handler == null)
+ {
+ assert(_pendingInterruptRead > 0);
+ _pendingInterruptRead -= readInterrupt(_pendingInterruptRead);
+ continue;
+ }
+ else if(handler._key == null || !handler._key.isValid())
+ {
+ continue;
+ }
+ if(handler.hasMoreData())
+ {
+ _pendingHandlers.remove(handler);
+ }
+ return handler;
+ }
+
+ if(_pendingIter == null && !_pendingHandlers.isEmpty())
+ {
+ _pendingIter = _pendingHandlers.iterator();
+ }
+
+ while(_pendingIter != null && _pendingIter.hasNext())
+ {
+ SelectorHandler handler = _pendingIter.next();
+ _pendingIter.remove();
+ if(handler._key == null || !handler._key.isValid() || !handler.hasMoreData())
+ {
+ continue;
+ }
+ return handler;
+ }
+
+ _iter = null;
+ _pendingIter = null;
+ return null;
+ }
+
+ public void
+ hasMoreData(SelectorHandler handler)
+ {
+ _nextPendingHandlers.add(handler);
+ }
+
+ public boolean
+ processInterrupt()
+ {
+ assert(_changes.size() <= _interruptCount);
+
+ if(!_changes.isEmpty())
+ {
+ java.util.Iterator<SelectorHandler> p = _changes.iterator();
+ while(p.hasNext())
+ {
+ SelectorHandler handler = p.next();
+ if(handler._pendingStatus == SocketStatus.Finished)
+ {
+ removeImpl(handler);
+ }
+ else
+ {
+ addImpl(handler, handler._pendingStatus);
+ }
+ clearInterrupt();
+ }
+ _changes.clear();
+
+ //
+ // We call selectNow() to flush the cancelled-key set and ensure handlers can be
+ // added again once this returns.
+ //
+ try
+ {
+ _selector.selectNow();
+ }
+ catch(java.io.IOException ex)
+ {
+ // Ignore.
+ }
+ _iter = null; // Current iterator is invalidated by selectNow()
+ }
+
+ _interrupted = _interruptCount > 0;
+ return _interruptCount == 0; // No more interrupts to process.
+ }
+
+ public boolean
+ checkTimeout()
+ {
+ if(_interruptCount == 0 && _keys.isEmpty() && _pendingHandlers.isEmpty())
+ {
+ if(_timeout <= 0)
+ {
+ //
+ // This is necessary to prevent a busy loop in case of a spurious wake-up which
+ // sometime occurs in the client thread pool when the communicator is destroyed.
+ // If there are too many successive spurious wake-ups, we log an error.
+ //
+ try
+ {
+ Thread.currentThread().sleep(1);
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
+
+ if(++_spuriousWakeUp > 100)
+ {
+ _instance.initializationData().logger.error("spurious selector wake up");
+ }
+ return false;
+ }
+ return true;
+ }
+ else
+ {
+ _spuriousWakeUp = 0;
+ return false;
+ }
+ }
+
+ public boolean
+ isInterrupted()
+ {
+ return _interruptCount > 0;
+ }
+
+ public void
+ setInterrupt()
+ {
+ if(++_interruptCount == 1)
+ {
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
+ buf.put(0, (byte)0);
+ while(buf.hasRemaining())
+ {
+ try
+ {
+ _fdIntrWrite.write(buf);
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
+ }
+ }
+ }
+
+ public boolean
+ clearInterrupt()
+ {
+ if(--_interruptCount == 0)
+ {
+ //
+ // If the interrupt byte has not been received by the pipe yet, we just increment
+ // _pendingInterruptRead. It will be read when the _fdIntrRead is ready for read.
+ //
+ if(_keys.contains(_fdIntrReadKey))
+ {
+ readInterrupt(1);
+ _keys.remove(_fdIntrReadKey);
+ _iter = null;
+ }
+ else
+ {
+ ++_pendingInterruptRead;
+ }
+ _interrupted = false;
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ private int
+ readInterrupt(int count)
+ {
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(count);
+ try
+ {
+ buf.rewind();
+ int ret = _fdIntrRead.read(buf);
+ assert(ret > 0);
+ return ret;
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
+ }
+
+ private int
+ convertStatus(java.nio.channels.SelectableChannel fd, SocketStatus status)
+ {
+ if(status == SocketStatus.NeedConnect)
+ {
+ return java.nio.channels.SelectionKey.OP_CONNECT;
+ }
+ else if(status == SocketStatus.NeedRead)
+ {
+ if((fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
+ {
+ return java.nio.channels.SelectionKey.OP_READ;
+ }
+ else
+ {
+ return java.nio.channels.SelectionKey.OP_ACCEPT;
+ }
+ }
+ else
+ {
+ assert(status == SocketStatus.NeedWrite);
+ return java.nio.channels.SelectionKey.OP_WRITE;
+ }
+ }
+
+ private void
+ addImpl(SelectorHandler handler, SocketStatus status)
+ {
+ if(handler._key != null)
+ {
+ handler._key.interestOps(convertStatus(handler.fd(), status));
+ }
+ else
+ {
+ try
+ {
+ handler._key = handler.fd().register(_selector, convertStatus(handler.fd(), status), handler);
+ }
+ catch(java.nio.channels.ClosedChannelException ex)
+ {
+ assert(false);
+ }
+ assert(!_nextPendingHandlers.contains(handler));
+ }
+
+ if(handler.hasMoreData())
+ {
+ _nextPendingHandlers.add(handler);
+ }
+ }
+
+ private void
+ removeImpl(SelectorHandler handler)
+ {
+ _nextPendingHandlers.remove(handler);
+
+ if(handler._key != null)
+ {
+ try
+ {
+ handler._key.cancel();
+ handler._key = null;
+ }
+ catch(java.nio.channels.CancelledKeyException ex)
+ {
+ assert(false);
+ }
+ }
+ }
+
+ final private Instance _instance;
+ final private int _timeout;
+
+ private java.nio.channels.Selector _selector;
+ private java.nio.channels.ReadableByteChannel _fdIntrRead;
+ private java.nio.channels.WritableByteChannel _fdIntrWrite;
+ private java.nio.channels.SelectionKey _fdIntrReadKey;
+
+ private java.util.Set<java.nio.channels.SelectionKey> _keys;
+ private java.util.Iterator<java.nio.channels.SelectionKey> _iter;
+ private java.util.HashSet<SelectorHandler> _changes = new java.util.HashSet<SelectorHandler>();
+
+ private boolean _interrupted;
+ private int _spuriousWakeUp;
+ private int _interruptCount;
+ private int _pendingInterruptRead;
+
+ private java.util.HashSet<SelectorHandler> _pendingHandlers = new java.util.HashSet<SelectorHandler>();
+ private java.util.HashSet<SelectorHandler> _nextPendingHandlers = new java.util.HashSet<SelectorHandler>();
+ private java.util.Iterator<SelectorHandler> _pendingIter;
+};
diff --git a/java/src/IceInternal/SelectorHandler.java b/java/src/IceInternal/SelectorHandler.java
new file mode 100644
index 00000000000..8c139e6030b
--- /dev/null
+++ b/java/src/IceInternal/SelectorHandler.java
@@ -0,0 +1,31 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+abstract class SelectorHandler
+{
+ abstract public java.nio.channels.SelectableChannel fd();
+
+ //
+ // In Java, it's possible that the transceiver reads more data
+ // than what was really asked. If this is the case, hasMoreData()
+ // returns true and the handler read() method should be called
+ // again (without doing a select()). This is handled by the
+ // Selector class (it adds the handler to a separate list of handlers
+ // if this method returns true.)
+ //
+ abstract public boolean hasMoreData();
+
+ //
+ // The _key data member are only for use by the Selector.
+ //
+ protected java.nio.channels.SelectionKey _key;
+ protected SocketStatus _pendingStatus;
+};
diff --git a/java/src/IceInternal/SelectorThread.java b/java/src/IceInternal/SelectorThread.java
index f3a824bfdc0..2753edcef84 100644
--- a/java/src/IceInternal/SelectorThread.java
+++ b/java/src/IceInternal/SelectorThread.java
@@ -11,48 +11,27 @@ package IceInternal;
public class SelectorThread
{
- public interface SocketReadyCallback
+ static public abstract class SocketReadyCallback extends SelectorHandler implements TimerTask
{
- //
- // The selector thread unregisters the callback when socketReady returns SocketStatus.Finished.
- //
- SocketStatus socketReady(boolean finished);
+ abstract public SocketStatus socketReady();
+ abstract public void socketFinished();
//
// The selector thread doesn't unregister the callback when sockectTimeout is called; socketTimeout
// must unregister the callback either explicitly with unregister() or by shutting down the socket
// (if necessary).
//
- void socketTimeout();
+ //abstract void socketTimeout();
+
+ protected int _timeout;
+ protected SocketStatus _status;
}
SelectorThread(Instance instance)
{
_instance = instance;
_destroyed = false;
-
- Network.SocketPair pair = Network.createPipe();
- _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
- _fdIntrWrite = pair.sink;
-
- try
- {
- _selector = java.nio.channels.Selector.open();
- pair.source.configureBlocking(false);
- _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ);
- }
- catch(java.io.IOException ex)
- {
- Ice.SyscallException sys = new Ice.SyscallException();
- sys.initCause(ex);
- throw sys;
- }
-
- //
- // The Selector holds a Set representing the selected keys. The
- // Set reference doesn't change, so we obtain it once here.
- //
- _keys = _selector.selectedKeys();
+ _selector = new Selector(instance, 0);
_thread = new HelperThread();
_thread.start();
@@ -72,34 +51,44 @@ public class SelectorThread
{
assert(!_destroyed);
_destroyed = true;
- setInterrupt();
+ _selector.setInterrupt();
}
public synchronized void
- _register(java.nio.channels.SelectableChannel fd, SocketReadyCallback cb, SocketStatus status, int timeout)
+ _register(SocketReadyCallback cb, SocketStatus status, int timeout)
{
assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories.
assert(status != SocketStatus.Finished);
- SocketInfo info = new SocketInfo(fd, cb, status, timeout);
- _changes.add(info);
- if(info.timeout >= 0)
+ cb._timeout = timeout;
+ cb._status = status;
+ if(cb._timeout >= 0)
{
- _timer.schedule(info, info.timeout);
+ _timer.schedule(cb, cb._timeout);
}
- setInterrupt();
+
+ _selector.add(cb, status);
}
- //
- // Unregister the given file descriptor. The registered callback will be notified with socketReady()
- // upon registration to allow some cleanup to be done.
- //
public synchronized void
- unregister(java.nio.channels.SelectableChannel fd)
+ unregister(SocketReadyCallback cb)
{
+ // Note: unregister should only be called from the socketReady() call-back.
assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories.
- _changes.add(new SocketInfo(fd, null, SocketStatus.Finished, 0));
- setInterrupt();
+
+ _selector.remove(cb);
+ cb._status = SocketStatus.Finished;
+ }
+
+ public synchronized void
+ finish(SocketReadyCallback cb)
+ {
+ assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories.
+
+ _selector.remove(cb);
+
+ _finished.add(cb);
+ _selector.setInterrupt();
}
public void
@@ -117,110 +106,48 @@ public class SelectorThread
}
}
- private void
- clearInterrupt()
- {
- byte b = 0;
-
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- try
- {
- while(true)
- {
- buf.rewind();
- if(_fdIntrRead.read(buf) != 1)
- {
- break;
- }
-
- b = buf.get(0);
- break;
- }
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
-
- private void
- setInterrupt()
+ public void
+ run()
{
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- buf.put(0, (byte)0);
- while(buf.hasRemaining())
+ while(true)
{
try
{
- _fdIntrWrite.write(buf);
+ _selector.select();
}
catch(java.io.IOException ex)
{
Ice.SocketException se = new Ice.SocketException();
se.initCause(ex);
- throw se;
+ //throw se;
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ se.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in selector thread:\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
+ continue;
}
- }
- }
-
- public void
- run()
- {
- java.util.Map<java.nio.channels.SelectableChannel, SocketInfo> socketMap =
- new java.util.HashMap<java.nio.channels.SelectableChannel, SocketInfo>();
- java.util.LinkedList<SocketInfo> readyList = new java.util.LinkedList<SocketInfo>();
- java.util.LinkedList<SocketInfo> finishedList = new java.util.LinkedList<SocketInfo>();
- while(true)
- {
- int ret = 0;
+
+ java.util.LinkedList<SocketReadyCallback> readyList = new java.util.LinkedList<SocketReadyCallback>();
+ boolean finished = false;
- while(true)
+ synchronized(this)
{
- try
- {
- ret = _selector.select();
- }
- catch(java.io.IOException ex)
+ _selector.checkTimeout();
+
+ if(_selector.isInterrupted())
{
- //
- // Pressing Ctrl-C causes select() to raise an
- // IOException, which seems like a JDK bug. We trap
- // for that special case here and ignore it.
- // Hopefully we're not masking something important!
- //
- if(Network.interrupted(ex))
+ if(_selector.processInterrupt())
{
continue;
}
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- //throw se;
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- se.printStackTrace(pw);
- pw.flush();
- String s = "exception in selector thread:\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- continue;
- }
-
- break;
- }
-
- assert(readyList.isEmpty() && finishedList.isEmpty());
-
- if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable())
- {
- synchronized(this)
- {
//
// There are two possiblities for an interrupt:
//
// 1. The selector thread has been destroyed.
- // 2. A socket was registered or unregistered.
+ // 2. A callback is being finished
//
//
@@ -231,82 +158,45 @@ public class SelectorThread
break;
}
- //
- // Remove the interrupt channel from the selected key set.
- //
- _keys.remove(_fdIntrReadKey);
-
- clearInterrupt();
- SocketInfo info = _changes.removeFirst();
- if(info.cb != null) // Registration
+ do
{
- try
- {
- info.key = info.fd.register(_selector, convertStatus(info.status), info);
- }
- catch(java.nio.channels.ClosedChannelException ex)
- {
- assert(false);
- }
- assert(!socketMap.containsKey(info.fd));
- socketMap.put(info.fd, info);
- }
- else // Unregistration
- {
- info = socketMap.get(info.fd);
- if(info != null && info.status != SocketStatus.Finished)
- {
- if(info.timeout >= 0)
- {
- _timer.cancel(info);
- }
-
- try
- {
- info.key.cancel();
- }
- catch(java.nio.channels.CancelledKeyException ex)
- {
- assert(false);
- }
- info.status = SocketStatus.Finished;
- readyList.add(info);
- }
+ SocketReadyCallback cb = _finished.removeFirst();
+ cb._status = SocketStatus.Finished;
+ readyList.add(cb);
}
+ while(_selector.clearInterrupt()); // As long as there are interrupts
+ finished = true;
}
- }
- else
- {
- //
- // Examine the selection key set.
- //
- java.util.Iterator<java.nio.channels.SelectionKey> iter = _keys.iterator();
- while(iter.hasNext())
+ else
{
- //
- // Ignore selection keys that have been cancelled or timed out.
- //
- java.nio.channels.SelectionKey key = iter.next();
- iter.remove();
- assert(key != _fdIntrReadKey);
- SocketInfo info = (SocketInfo)key.attachment();
- if(info.timeout >= 0)
+ SocketReadyCallback cb;
+ while((cb = (SocketReadyCallback)_selector.getNextSelected()) != null)
{
- _timer.cancel(info);
+ readyList.add(cb);
}
- assert(key.isValid());
- readyList.add(info);
}
}
- java.util.Iterator<SocketInfo> iter = readyList.iterator();
+ java.util.Iterator<SocketReadyCallback> iter = readyList.iterator();
while(iter.hasNext())
{
- SocketInfo info = iter.next();
- SocketStatus status;
+ SocketStatus status = SocketStatus.Finished;
+ SocketReadyCallback cb = iter.next();
try
{
- status = info.cb.socketReady(info.status == SocketStatus.Finished);
+ if(cb._timeout >= 0)
+ {
+ _timer.cancel(cb);
+ }
+
+ if(finished)
+ {
+ cb.socketFinished();
+ }
+ else
+ {
+ status = cb.socketReady();
+ }
}
catch(Ice.LocalException ex)
{
@@ -320,142 +210,39 @@ public class SelectorThread
status = SocketStatus.Finished;
}
- if(status == SocketStatus.Finished)
- {
- finishedList.add(info);
- }
- else
+ if(status != SocketStatus.Finished)
{
- assert(info.status != SocketStatus.Finished);
- try
- {
- info.status = status;
- info.key.interestOps(convertStatus(status));
- if(info.timeout >= 0)
- {
- _timer.schedule(info, info.timeout);
- }
- }
- catch(java.nio.channels.CancelledKeyException ex)
+ if(cb.hasMoreData())
{
- assert(false);
+ _selector.hasMoreData(cb);
}
- }
- }
- readyList.clear();
-
- if(finishedList.isEmpty())
- {
- continue;
- }
- iter = finishedList.iterator();
- while(iter.hasNext())
- {
- SocketInfo info = iter.next();
- if(info.status != SocketStatus.Finished)
- {
- try
+ if(status != cb._status)
{
- info.key.cancel();
+ synchronized(this)
+ {
+ _selector.update(cb, status);
+ cb._status = status;
+ }
}
- catch(java.nio.channels.CancelledKeyException ex)
+
+ if(cb._timeout >= 0)
{
- //assert(false); // The channel might already be closed at this point so we can't assert.
+ _timer.schedule(cb, cb._timeout);
}
}
- socketMap.remove(info.fd);
}
- finishedList.clear();
}
assert(_destroyed);
- try
- {
- _selector.close();
- }
- catch(java.io.IOException ex)
- {
- // Ignore.
- }
-
- try
- {
- _fdIntrWrite.close();
- }
- catch(java.io.IOException ex)
- {
- //
- // BUGFIX:
- //
- // Ignore this exception. This shouldn't happen
- // but for some reasons the close() call raises
- // "java.io.IOException: No such file or
- // directory" under Linux with JDK 1.4.2.
- //
- }
- _fdIntrWrite = null;
-
- try
- {
- _fdIntrRead.close();
- }
- catch(java.io.IOException ex)
- {
- }
- _fdIntrRead = null;
- }
-
- private int
- convertStatus(SocketStatus status)
- {
- if(status == SocketStatus.NeedConnect)
- {
- return java.nio.channels.SelectionKey.OP_CONNECT;
- }
- else if(status == SocketStatus.NeedRead)
- {
- return java.nio.channels.SelectionKey.OP_READ;
- }
- else
- {
- assert(status == SocketStatus.NeedWrite);
- return java.nio.channels.SelectionKey.OP_WRITE;
- }
+ _selector.destroy();
}
private Instance _instance;
private boolean _destroyed;
- private java.nio.channels.ReadableByteChannel _fdIntrRead;
- private java.nio.channels.SelectionKey _fdIntrReadKey;
- private java.nio.channels.WritableByteChannel _fdIntrWrite;
- private java.nio.channels.Selector _selector;
- private java.util.Set<java.nio.channels.SelectionKey> _keys;
- private java.util.LinkedList<SocketInfo> _changes = new java.util.LinkedList<SocketInfo>();
-
- private final class SocketInfo implements TimerTask
- {
- java.nio.channels.SelectableChannel fd;
- SocketReadyCallback cb;
- SocketStatus status;
- int timeout;
- java.nio.channels.SelectionKey key;
-
- public void
- runTimerTask()
- {
- this.cb.socketTimeout(); // Exceptions will be reported by the timer thread.
- }
-
- SocketInfo(java.nio.channels.SelectableChannel fd, SocketReadyCallback cb, SocketStatus status, int timeout)
- {
- this.fd = fd;
- this.cb = cb;
- this.status = status;
- this.timeout = timeout;
- }
- }
+ private Selector _selector;
+ private java.util.LinkedList<SocketReadyCallback> _finished = new java.util.LinkedList<SocketReadyCallback>();
private final class HelperThread extends Thread
{
diff --git a/java/src/IceInternal/TcpAcceptor.java b/java/src/IceInternal/TcpAcceptor.java
index 72c8b923d06..87e1f619adf 100644
--- a/java/src/IceInternal/TcpAcceptor.java
+++ b/java/src/IceInternal/TcpAcceptor.java
@@ -26,37 +26,9 @@ class TcpAcceptor implements Acceptor
_logger.trace(_traceLevels.networkCat, s);
}
- java.nio.channels.ServerSocketChannel fd;
- java.nio.channels.Selector selector;
- synchronized(this)
- {
- fd = _fd;
- selector = _selector;
- _fd = null;
- _selector = null;
- }
- if(fd != null)
- {
- try
- {
- fd.close();
- }
- catch(java.io.IOException ex)
- {
- // Ignore.
- }
- }
- if(selector != null)
- {
- try
- {
- selector.close();
- }
- catch(java.io.IOException ex)
- {
- // Ignore.
- }
- }
+ assert(_fd != null);
+ Network.closeSocketNoThrow(_fd);
+ _fd = null;
}
public void
@@ -72,60 +44,15 @@ class TcpAcceptor implements Acceptor
}
public Transceiver
- accept(int timeout)
+ accept()
{
java.nio.channels.SocketChannel fd = null;
- while(fd == null)
+ while(true)
{
try
{
fd = _fd.accept();
- if(fd == null)
- {
- if(_selector == null)
- {
- _selector = java.nio.channels.Selector.open();
- }
-
- while(true)
- {
- try
- {
- java.nio.channels.SelectionKey key =
- _fd.register(_selector, java.nio.channels.SelectionKey.OP_ACCEPT);
- if(timeout > 0)
- {
- if(_selector.select(timeout) == 0)
- {
- throw new Ice.TimeoutException();
- }
- }
- else if(timeout == 0)
- {
- if(_selector.selectNow() == 0)
- {
- throw new Ice.TimeoutException();
- }
- }
- else
- {
- _selector.select();
- }
-
- break;
- }
- catch(java.io.IOException ex)
- {
- if(Network.interrupted(ex))
- {
- continue;
- }
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
- }
+ break;
}
catch(java.io.IOException ex)
{
@@ -163,15 +90,6 @@ class TcpAcceptor implements Acceptor
return new TcpTransceiver(_instance, fd, true);
}
- public void
- connectToSelf()
- {
- java.nio.channels.SocketChannel fd = Network.createTcpSocket();
- Network.setBlock(fd, false);
- Network.doConnect(fd, _addr, -1);
- Network.closeSocket(fd);
- }
-
public String
toString()
{
diff --git a/java/src/IceInternal/TcpConnector.java b/java/src/IceInternal/TcpConnector.java
index db4061b1721..99a80dff1db 100644
--- a/java/src/IceInternal/TcpConnector.java
+++ b/java/src/IceInternal/TcpConnector.java
@@ -12,7 +12,7 @@ package IceInternal;
final class TcpConnector implements Connector, java.lang.Comparable
{
public Transceiver
- connect(int timeout)
+ connect()
{
if(_traceLevels.network >= 2)
{
@@ -25,7 +25,7 @@ final class TcpConnector implements Connector, java.lang.Comparable
java.nio.channels.SocketChannel fd = Network.createTcpSocket();
Network.setBlock(fd, false);
Network.setTcpBufSize(fd, _instance.initializationData().properties, _logger);
- boolean connected = Network.doConnect(fd, _addr, timeout);
+ boolean connected = Network.doConnect(fd, _addr);
if(connected)
{
if(_traceLevels.network >= 1)
diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java
index 80730ce6db0..548f2c3a94b 100644
--- a/java/src/IceInternal/TcpTransceiver.java
+++ b/java/src/IceInternal/TcpTransceiver.java
@@ -19,9 +19,9 @@ final class TcpTransceiver implements Transceiver
}
public SocketStatus
- initialize(int timeout)
+ initialize()
{
- if(_state == StateNeedConnect && timeout == 0)
+ if(_state == StateNeedConnect)
{
_state = StateConnectPending;
return SocketStatus.NeedConnect;
@@ -30,7 +30,7 @@ final class TcpTransceiver implements Transceiver
{
try
{
- Network.doFinishConnect(_fd, timeout);
+ Network.doFinishConnect(_fd);
_state = StateConnected;
_desc = Network.fdToString(_fd);
}
@@ -63,82 +63,10 @@ final class TcpTransceiver implements Transceiver
_logger.trace(_traceLevels.networkCat, s);
}
- synchronized(this)
- {
- assert(_fd != null);
- if(_readSelector != null)
- {
- try
- {
- _readSelector.close();
- }
- catch(java.io.IOException ex)
- {
- // Ignore.
- }
- _readSelector = null;
- }
- if(_writeSelector != null)
- {
- try
- {
- _writeSelector.close();
- }
- catch(java.io.IOException ex)
- {
- // Ignore.
- }
- _writeSelector = null;
- }
- try
- {
- _fd.close();
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- finally
- {
- _fd = null;
- }
- }
- }
-
- public void
- shutdownWrite()
- {
- if(_state < StateConnected)
- {
- return;
- }
-
- if(_traceLevels.network >= 2)
- {
- String s = "shutting down tcp connection for writing\n" + toString();
- _logger.trace(_traceLevels.networkCat, s);
- }
-
assert(_fd != null);
- java.net.Socket socket = _fd.socket();
try
{
- socket.shutdownOutput(); // Shutdown socket for writing
- }
- catch(java.net.SocketException ex)
- {
- //
- // Ignore. We can't reliably figure out if the socket
- // exception is because the socket is not connected.
- //
- // if(!Network.notConnected(ex))
- // {
- // Ice.SocketException se = new Ice.SocketException();
- // se.initCause(ex);
- // throw se;
- // }
+ _fd.close();
}
catch(java.io.IOException ex)
{
@@ -146,107 +74,85 @@ final class TcpTransceiver implements Transceiver
se.initCause(ex);
throw se;
}
- }
-
- public void
- shutdownReadWrite()
- {
- if(_state < StateConnected)
- {
- return;
- }
-
- if(_traceLevels.network >= 2)
- {
- String s = "shutting down tcp connection for reading and writing\n" + toString();
- _logger.trace(_traceLevels.networkCat, s);
- }
-
- assert(_fd != null);
- java.net.Socket socket = _fd.socket();
- try
+ finally
{
- socket.shutdownInput(); // Shutdown socket for reading
- socket.shutdownOutput(); // Shutdown socket for writing
- }
- catch(java.net.SocketException ex)
- {
- //
- // Ignore. We can't reliably figure out if the socket
- // exception is because the socket is not connected.
- //
- // if(!Network.notConnected(ex))
- // {
- // Ice.SocketException se = new Ice.SocketException();
- // se.initCause(ex);
- // throw se;
- // }
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
+ _fd = null;
}
}
-
+
public boolean
- write(Buffer buf, int timeout)
+ write(Buffer buf)
{
- while(writeBuffer(buf.b))
+ final int size = buf.b.limit();
+ int packetSize = size - buf.b.position();
+ if(_maxPacketSize > 0 && packetSize > _maxPacketSize)
{
- //
- // There is more data to write but the socket would block; now we
- // must deal with timeouts.
- //
- assert(buf.b.hasRemaining());
+ packetSize = _maxPacketSize;
+ buf.b.limit(buf.b.position() + packetSize);
+ }
- if(timeout == 0)
- {
- return false;
- }
-
+ while(buf.b.hasRemaining())
+ {
try
{
- if(_writeSelector == null)
+ assert(_fd != null);
+ int ret = _fd.write(buf.b);
+
+ if(ret == -1)
{
- _writeSelector = java.nio.channels.Selector.open();
- _fd.register(_writeSelector, java.nio.channels.SelectionKey.OP_WRITE, null);
+ throw new Ice.ConnectionLostException();
}
-
- try
+ else if(ret == 0)
{
- if(timeout > 0)
- {
- long start = IceInternal.Time.currentMonotonicTimeMillis();
- int n = _writeSelector.select(timeout);
- if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout)
- {
- throw new Ice.TimeoutException();
- }
- }
- else
+ //
+ // Writing would block, so we reset the limit (if necessary) and return true to indicate
+ // that more data must be sent.
+ //
+ if(packetSize == _maxPacketSize)
{
- _writeSelector.select();
+ buf.b.limit(size);
}
+ return false;
+ }
+
+ if(_traceLevels.network >= 3)
+ {
+ String s = "sent " + ret + " of " + size + " bytes via tcp\n" + toString();
+ _logger.trace(_traceLevels.networkCat, s);
}
- catch(java.io.InterruptedIOException ex)
+
+ if(_stats != null)
{
- // Ignore.
+ _stats.bytesSent(type(), ret);
+ }
+
+ if(packetSize == _maxPacketSize)
+ {
+ assert(buf.b.position() == buf.b.limit());
+ packetSize = size - buf.b.position();
+ if(packetSize > _maxPacketSize)
+ {
+ packetSize = _maxPacketSize;
+ }
+ buf.b.limit(buf.b.position() + packetSize);
}
}
+ catch(java.io.InterruptedIOException ex)
+ {
+ continue;
+ }
catch(java.io.IOException ex)
{
Ice.SocketException se = new Ice.SocketException();
se.initCause(ex);
throw se;
}
- }
+ }
return true;
}
public boolean
- read(Buffer buf, int timeout, Ice.BooleanHolder moreData)
+ read(Buffer buf, Ice.BooleanHolder moreData)
{
int remaining = 0;
if(_traceLevels.network >= 3)
@@ -269,39 +175,7 @@ final class TcpTransceiver implements Transceiver
if(ret == 0)
{
- if(timeout == 0)
- {
- return false;
- }
-
- if(_readSelector == null)
- {
- _readSelector = java.nio.channels.Selector.open();
- _fd.register(_readSelector, java.nio.channels.SelectionKey.OP_READ, null);
- }
-
- try
- {
- if(timeout > 0)
- {
- long start = IceInternal.Time.currentMonotonicTimeMillis();
- int n = _readSelector.select(timeout);
- if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout)
- {
- throw new Ice.TimeoutException();
- }
- }
- else
- {
- _readSelector.select();
- }
- }
- catch(java.io.InterruptedIOException ex)
- {
- // Ignore.
- }
-
- continue;
+ return false;
}
if(ret > 0)
@@ -398,86 +272,12 @@ final class TcpTransceiver implements Transceiver
super.finalize();
}
- private boolean
- writeBuffer(java.nio.ByteBuffer buf)
- {
- final int size = buf.limit();
- int packetSize = size - buf.position();
- if(_maxPacketSize > 0 && packetSize > _maxPacketSize)
- {
- packetSize = _maxPacketSize;
- buf.limit(buf.position() + packetSize);
- }
-
- while(buf.hasRemaining())
- {
- try
- {
- assert(_fd != null);
- int ret = _fd.write(buf);
-
- if(ret == -1)
- {
- throw new Ice.ConnectionLostException();
- }
- else if(ret == 0)
- {
- //
- // Writing would block, so we reset the limit (if necessary) and return true to indicate
- // that more data must be sent.
- //
- if(packetSize == _maxPacketSize)
- {
- buf.limit(size);
- }
- return true;
- }
-
- if(_traceLevels.network >= 3)
- {
- String s = "sent " + ret + " of " + size + " bytes via tcp\n" + toString();
- _logger.trace(_traceLevels.networkCat, s);
- }
-
- if(_stats != null)
- {
- _stats.bytesSent(type(), ret);
- }
-
- if(packetSize == _maxPacketSize)
- {
- assert(buf.position() == buf.limit());
- packetSize = size - buf.position();
- if(packetSize > _maxPacketSize)
- {
- packetSize = _maxPacketSize;
- }
- buf.limit(buf.position() + packetSize);
- }
- }
- catch(java.io.InterruptedIOException ex)
- {
- continue;
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
-
- return false; // No more data to send.
- }
-
private java.nio.channels.SocketChannel _fd;
private TraceLevels _traceLevels;
private Ice.Logger _logger;
private Ice.Stats _stats;
private String _desc;
private int _state;
- private java.nio.channels.Selector _readSelector;
- private java.nio.channels.Selector _writeSelector;
private int _maxPacketSize;
private static final int StateNeedConnect = 0;
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index f38ea735b6f..6f19615ecf0 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -26,11 +26,13 @@ public final class ThreadPool
_destroyed = false;
_prefix = prefix;
_timeout = timeout;
+ _selector = new Selector(instance, timeout);
_threadIndex = 0;
_running = 0;
_inUse = 0;
_load = 1.0;
_promote = true;
+ _serialize = _instance.initializationData().properties.getPropertyAsInt(_prefix + ".Serialize") > 0;
_warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName");
@@ -43,29 +45,6 @@ public final class ThreadPool
_programNamePrefix = "";
}
- Network.SocketPair pair = Network.createPipe();
- _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
- _fdIntrWrite = pair.sink;
-
- try
- {
- _selector = java.nio.channels.Selector.open();
- pair.source.configureBlocking(false);
- _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ);
- }
- catch(java.io.IOException ex)
- {
- Ice.SyscallException sys = new Ice.SyscallException();
- sys.initCause(ex);
- throw sys;
- }
-
- //
- // The Selector holds a Set representing the selected keys. The
- // Set reference doesn't change, so we obtain it once here.
- //
- _keys = _selector.selectedKeys();
-
//
// We use just one thread as the default. This is the fastest
// possible setting, still allows one level of nesting, and
@@ -144,53 +123,71 @@ public final class ThreadPool
}
assert(!_destroyed);
- assert(_handlerMap.isEmpty());
_destroyed = true;
- setInterrupt();
+ _selector.setInterrupt();
}
public synchronized void
- _register(java.nio.channels.SelectableChannel fd, EventHandler handler)
+ _register(EventHandler handler)
{
- if(TRACE_REGISTRATION)
+ assert(!_destroyed);
+
+ if(!handler._registered)
{
- trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd);
+ if(TRACE_REGISTRATION)
+ {
+ trace("adding handler of type " + handler.getClass().getName() + " for channel " + handler.fd());
+ }
+
+ if(!handler._serializing)
+ {
+ _selector.add(handler, SocketStatus.NeedRead);
+ }
+ handler._registered = true;
}
- assert(!_destroyed);
- _changes.add(new FdHandlerPair(fd, handler));
- setInterrupt();
}
public synchronized void
- unregister(java.nio.channels.SelectableChannel fd)
+ unregister(EventHandler handler)
{
- if(TRACE_REGISTRATION)
+ assert(!_destroyed);
+ if(handler._registered)
{
- if(TRACE_STACK_TRACE)
+ if(TRACE_REGISTRATION)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- try
- {
- throw new RuntimeException();
- }
- catch(RuntimeException ex)
- {
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- }
- trace("removing handler for channel " + fd + "\n" + sw.toString());
+ trace("removing handler for channel " + handler.fd());
}
- else
+
+ if(!handler._serializing)
{
- trace("removing handler for channel " + fd);
+ _selector.remove(handler);
}
+ handler._registered = false;
}
+ }
+ public synchronized void
+ finish(EventHandler handler)
+ {
assert(!_destroyed);
- _changes.add(new FdHandlerPair(fd, null));
- setInterrupt();
- }
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("finishing handler for channel " + handler.fd());
+ }
+
+ if(handler._registered)
+ {
+ if(!handler._serializing)
+ {
+ _selector.remove(handler);
+ }
+ handler._registered = false;
+ }
+
+ _finished.add(handler);
+ _selector.setInterrupt();
+ }
public synchronized void
execute(ThreadPoolWorkItem workItem)
@@ -200,16 +197,25 @@ public final class ThreadPool
throw new Ice.CommunicatorDestroyedException();
}
_workItems.add(workItem);
- setInterrupt();
+ _selector.setInterrupt();
}
public void
- promoteFollower()
+ promoteFollower(EventHandler handler)
{
if(_sizeMax > 1)
{
synchronized(this)
{
+ if(_serialize && handler != null)
+ {
+ handler._serializing = true;
+ if(handler._registered)
+ {
+ _selector.remove(handler);
+ }
+ }
+
assert(!_promote);
_promote = true;
notify();
@@ -280,65 +286,9 @@ public final class ThreadPool
}
//
- // Cleanup the selector, and the socket pair.
+ // Destroy the selector
//
- try
- {
- if(_selector != null)
- {
- try
- {
- _selector.close();
- }
- catch(java.io.IOException ex)
- {
- //
- // BUGFIX:
- //
- // Ignore this exception. This shouldn't happen
- // but for some reasons the close() call raises
- // "java.io.IOException: Bad file descriptor" on
- // Mac OS X 10.3.x (it works fine on OS X 10.4.x)
- //
- }
- _selector = null;
- }
-
- if(_fdIntrWrite != null)
- {
- try
- {
- _fdIntrWrite.close();
- }
- catch(java.io.IOException ex)
- {
- //
- // BUGFIX:
- //
- // Ignore this exception. This shouldn't happen
- // but for some reasons the close() call raises
- // "java.io.IOException: No such file or
- // directory" under Linux with JDK 1.4.2.
- //
- }
- _fdIntrWrite = null;
- }
-
- if(_fdIntrRead != null)
- {
- _fdIntrRead.close();
- _fdIntrRead = null;
- }
- }
- catch(java.io.IOException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' while calling close():\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- }
+ _selector.destroy();
}
public String
@@ -346,91 +296,6 @@ public final class ThreadPool
{
return _prefix;
}
-
- private void
- clearInterrupt()
- {
- if(TRACE_INTERRUPT)
- {
- trace("clearInterrupt");
- if(TRACE_STACK_TRACE)
- {
- try
- {
- throw new RuntimeException();
- }
- catch(RuntimeException ex)
- {
- ex.printStackTrace();
- }
- }
- }
-
- byte b = 0;
-
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- try
- {
- while(true)
- {
- buf.rewind();
- if(_fdIntrRead.read(buf) != 1)
- {
- break;
- }
-
- if(TRACE_INTERRUPT)
- {
- trace("clearInterrupt got byte " + (int)buf.get(0));
- }
-
- b = buf.get(0);
- break;
- }
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
-
- private void
- setInterrupt()
- {
- if(TRACE_INTERRUPT)
- {
- trace("setInterrupt()");
- if(TRACE_STACK_TRACE)
- {
- try
- {
- throw new RuntimeException();
- }
- catch(RuntimeException ex)
- {
- ex.printStackTrace();
- }
- }
- }
-
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- buf.put(0, (byte)0);
- while(buf.hasRemaining())
- {
- try
- {
- _fdIntrWrite.write(buf);
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
- }
//
// Each thread supplies a BasicStream, to avoid creating excessive
@@ -465,206 +330,85 @@ public final class ThreadPool
while(true)
{
- if(TRACE_REGISTRATION)
- {
- java.util.Set<java.nio.channels.SelectionKey> keys = _selector.keys();
- trace("selecting on " + keys.size() + " channels:");
- java.util.Iterator<java.nio.channels.SelectionKey> i = keys.iterator();
- while(i.hasNext())
- {
- java.nio.channels.SelectionKey key = i.next();
- trace(" " + key.channel());
- }
- }
-
- EventHandler handler = null;
- ThreadPoolWorkItem workItem = null;
-
- //
- // Only call select() if there are no pending handlers with additional data
- // for us to read.
- //
- if(!_pendingHandlers.isEmpty())
+ try
{
- handler = _pendingHandlers.removeFirst();
+ _selector.select();
}
- else
+ catch(java.io.IOException ex)
{
- select();
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ //throw se;
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ se.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in `" + _prefix + "':\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
+ continue;
}
+ EventHandler handler = null;
+ ThreadPoolWorkItem workItem = null;
boolean finished = false;
boolean shutdown = false;
- if(handler == null)
+ synchronized(this)
{
- synchronized(this)
+ if(_selector.checkTimeout())
+ {
+ assert(_timeout > 0);
+ shutdown = true;
+ }
+ else if(_selector.isInterrupted())
{
- if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout.
+ if(_selector.processInterrupt())
{
- if(TRACE_SELECT)
- {
- trace("timeout");
- }
-
- assert(_timeout > 0);
- _timeout = 0;
- shutdown = true;
+ continue;
+ }
+
+ //
+ // There are three possiblities for an interrupt:
+ //
+ // 1. The thread pool has been destroyed.
+ //
+ // 2. An event handler is being finished.
+ //
+ // 3. A work item has been scheduled.
+ //
+
+ if(!_finished.isEmpty())
+ {
+ _selector.clearInterrupt();
+ handler = _finished.removeFirst();
+ finished = true;
+ }
+ else if(!_workItems.isEmpty())
+ {
+ //
+ // Work items must be executed first even if the thread pool is destroyed.
+ //
+ _selector.clearInterrupt();
+ workItem = _workItems.removeFirst();
+ }
+ else if(_destroyed)
+ {
+ //
+ // Don't clear the interrupt if destroyed, so that the other threads exit as well.
+ //
+ return true;
}
else
{
- if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable())
- {
- if(TRACE_SELECT || TRACE_INTERRUPT)
- {
- trace("detected interrupt");
- }
-
- //
- // There are three possiblities for an interrupt:
- //
- // 1. The thread pool has been destroyed.
- //
- // 2. An event handler was registered or unregistered.
- //
- // 3. A work item has been scheduled.
- //
-
- if(!_workItems.isEmpty())
- {
- //
- // Work items must be executed first even if the thread pool is destroyed.
- //
-
- //
- // Remove the interrupt channel from the selected key set.
- //
- _keys.remove(_fdIntrReadKey);
- clearInterrupt();
- assert(!_workItems.isEmpty());
- workItem = _workItems.removeFirst();
- }
- else if(_destroyed)
- {
- if(TRACE_SHUTDOWN)
- {
- trace("destroyed, thread id = " + Thread.currentThread());
- }
-
- //
- // Don't clear the interrupt fd if destroyed, so that the other threads exit as well.
- //
- return true;
- }
- else
- {
- //
- // Remove the interrupt channel from the selected key set.
- //
- _keys.remove(_fdIntrReadKey);
- clearInterrupt();
-
- //
- // An event handler must have been registered or unregistered.
- //
- assert(!_changes.isEmpty());
- FdHandlerPair change = _changes.removeFirst();
-
- if(change.handler != null) // Addition if handler is set.
- {
- int op;
- if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
- {
- op = java.nio.channels.SelectionKey.OP_READ;
- }
- else
- {
- op = java.nio.channels.SelectionKey.OP_ACCEPT;
- }
-
- java.nio.channels.SelectionKey key = null;
- try
- {
- key = change.fd.register(_selector, op, change.handler);
- }
- catch(java.nio.channels.ClosedChannelException ex)
- {
- assert(false);
- }
- _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
-
- //
- // If the handler is readable and already has some data to read add it
- // to the _pendingHandlers list to ensure it will be processed.
- //
- if(change.handler.readable() && change.handler.hasMoreData())
- {
- _pendingHandlers.add(change.handler);
- }
-
- if(TRACE_REGISTRATION)
- {
- trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
- change.fd);
- }
-
- continue;
- }
- else // Removal if handler is not set.
- {
- HandlerKeyPair pair = _handlerMap.remove(change.fd);
- assert(pair != null);
- handler = pair.handler;
- finished = true;
- pair.key.cancel();
-
- if(TRACE_REGISTRATION)
- {
- trace("removed handler (" + handler.getClass().getName() + ") for fd " +
- change.fd);
- }
-
- // Don't continue; we have to call
- // finished() on the event handler below,
- // outside the thread synchronization.
- }
- }
- }
- else
- {
- java.nio.channels.SelectionKey key = null;
- java.util.Iterator<java.nio.channels.SelectionKey> iter = _keys.iterator();
- while(iter.hasNext())
- {
- //
- // Ignore selection keys that have been cancelled
- //
- java.nio.channels.SelectionKey k = iter.next();
- iter.remove();
- if(k.isValid() && k != _fdIntrReadKey)
- {
- if(TRACE_SELECT)
- {
- trace("found a key: " + keyToString(k));
- }
-
- key = k;
- break;
- }
- }
-
- if(key == null)
- {
- if(TRACE_SELECT)
- {
- trace("didn't find a valid key");
- }
-
- continue;
- }
-
- handler = (EventHandler)key.attachment();
- }
+ assert(false);
+ }
+ }
+ else
+ {
+ handler = (EventHandler)_selector.getNextSelected();
+ if(handler == null)
+ {
+ continue;
}
}
}
@@ -675,11 +419,6 @@ public final class ThreadPool
if(shutdown)
{
- if(TRACE_SHUTDOWN)
- {
- trace("shutdown detected");
- }
-
//
// Initiate server shutdown.
//
@@ -693,7 +432,7 @@ public final class ThreadPool
continue;
}
- promoteFollower();
+ promoteFollower(null);
factory.shutdown();
//
@@ -732,8 +471,7 @@ public final class ThreadPool
if(finished)
{
//
- // Notify a handler about its removal from
- // the thread pool.
+ // Notify a handler about its removal from the thread pool.
//
try
{
@@ -773,14 +511,10 @@ public final class ThreadPool
{
continue; // Can't read without blocking.
}
-
- //
- // If the handler has more data to process add it to the _pendingHandlers list
- // to ensure it will be processed.
- //
+
if(handler.hasMoreData())
{
- _pendingHandlers.add(handler);
+ _selector.hasMoreData(handler);
}
}
catch(Ice.TimeoutException ex)
@@ -871,6 +605,15 @@ public final class ThreadPool
{
if(!_destroyed)
{
+ if(_serialize && handler != null && handler._serializing)
+ {
+ if(handler._registered)
+ {
+ _selector.add(handler, SocketStatus.NeedRead);
+ }
+ handler._serializing = false;
+ }
+
//
// First we reap threads that have been
// destroyed before.
@@ -1146,84 +889,6 @@ public final class ThreadPool
*/
private void
- select()
- {
- int ret = 0;
- int spuriousWakeUp = 0;
- while(true)
- {
- try
- {
- if(TRACE_SELECT)
- {
- trace("select on " + _selector.keys().size() + " keys, thread id = " + Thread.currentThread());
- }
-
- if(_timeout > 0)
- {
- ret = _selector.select(_timeout * 1000);
- }
- else
- {
- ret = _selector.select();
- }
- }
- catch(java.io.IOException ex)
- {
- //
- // Pressing Ctrl-C causes select() to raise an
- // IOException, which seems like a JDK bug. We trap
- // for that special case here and ignore it.
- // Hopefully we're not masking something important!
- //
- if(Network.interrupted(ex))
- {
- continue;
- }
-
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- //throw se;
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- se.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- continue;
- }
-
- if(TRACE_SELECT)
- {
- trace("select() returned " + ret + ", _keys.size() = " + _keys.size());
- }
-
- if(ret == 0 && _timeout <= 0)
- {
- //
- // This is necessary to prevent a busy loop in case of a spurious wake-up which
- // sometime occurs in the client thread pool when the communicator is destroyed.
- // If there are too many successive spurious wake-ups, we log an error.
- //
- try
- {
- Thread.currentThread().sleep(1);
- }
- catch(java.lang.InterruptedException ex)
- {
- }
-
- if(++spuriousWakeUp > 100)
- {
- _instance.initializationData().logger.error("spurious selector wake up in `" + _prefix + "'");
- }
- }
-
- break;
- }
- }
-
- private void
trace(String msg)
{
System.err.println(_prefix + ": " + msg);
@@ -1253,57 +918,15 @@ public final class ThreadPool
return key.channel() + " " + ops;
}
- private static final class FdHandlerPair
- {
- java.nio.channels.SelectableChannel fd;
- EventHandler handler;
-
- FdHandlerPair(java.nio.channels.SelectableChannel fd, EventHandler handler)
- {
- this.fd = fd;
- this.handler = handler;
- }
- }
-
- private static final class HandlerKeyPair
- {
- EventHandler handler;
- java.nio.channels.SelectionKey key;
-
- HandlerKeyPair(EventHandler handler, java.nio.channels.SelectionKey key)
- {
- this.handler = handler;
- this.key = key;
- }
- }
-
private Instance _instance;
private boolean _destroyed;
private final String _prefix;
private final String _programNamePrefix;
-
- private java.nio.channels.ReadableByteChannel _fdIntrRead;
- private java.nio.channels.SelectionKey _fdIntrReadKey;
- private java.nio.channels.WritableByteChannel _fdIntrWrite;
- private java.nio.channels.Selector _selector;
- private java.util.Set<java.nio.channels.SelectionKey> _keys;
-
- private java.util.LinkedList<FdHandlerPair> _changes = new java.util.LinkedList<FdHandlerPair>();
+ private final Selector _selector;
private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>();
-
- private java.util.Map<java.nio.channels.SelectableChannel, HandlerKeyPair> _handlerMap =
- new java.util.HashMap<java.nio.channels.SelectableChannel, HandlerKeyPair>();
-
+ private java.util.LinkedList<EventHandler> _finished = new java.util.LinkedList<EventHandler>();
private int _timeout;
- //
- // Since the Java5 SSL transceiver can read more data from the socket than is
- // actually requested, we have to keep a separate list of handlers that need
- // the thread pool to read more data before it re-enters a blocking call to
- // select().
- //
- private java.util.LinkedList<EventHandler> _pendingHandlers = new java.util.LinkedList<EventHandler>();
-
private final class EventHandlerThread extends Thread
{
EventHandlerThread(String name)
@@ -1377,6 +1000,7 @@ public final class ThreadPool
private final int _size; // Number of threads that are pre-created.
private final int _sizeMax; // Maximum number of threads.
private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
+ private final boolean _serialize; // True if requests need to be serialized over the connection.
private java.util.List<EventHandlerThread> _threads; // All threads, running or not.
private int _threadIndex; // For assigning thread names.
diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java
index 7bdd2369ba8..ff76dcc85e6 100644
--- a/java/src/IceInternal/Transceiver.java
+++ b/java/src/IceInternal/Transceiver.java
@@ -24,40 +24,28 @@ public interface Transceiver
// socket is ready for reading or writing and until it returns
// SocketStatus.Finished.
//
- SocketStatus initialize(int timeout);
+ SocketStatus initialize();
void close();
- void shutdownWrite();
- void shutdownReadWrite();
//
// Write data.
//
- // Returns true if all the data was written, false otherwise. If
- // timeout is -1, this operation will block until all the data is
- // written. If timeout is 0, it will return when the write can't
- // be completed without blocking. If the timeout is > 0, it will
- // block until all the data is written or the specified timeout
- // expires.
+ // Returns true if all the data was written, false otherwise.
//
- boolean write(Buffer buf, int timeout);
+ boolean write(Buffer buf);
//
// Read data.
//
// Returns true if all the requested data was read, false otherwise.
- // If timeout is -1, this operation will block until all the data
- // is read. If timeout is 0, it will return when the read can't be
- // completed without blocking. If the timeout is > 0, it will
- // block until all the data is read or the specified timeout
- // expires.
//
// NOTE: In Java, read() returns a boolean in moreData to indicate
// whether the transceiver has read more data than requested.
// If moreData is true, read should be called again without
// calling select on the FD.
//
- boolean read(Buffer buf, int timeout, Ice.BooleanHolder moreData);
+ boolean read(Buffer buf, Ice.BooleanHolder moreData);
String type();
String toString();
diff --git a/java/src/IceInternal/UdpConnector.java b/java/src/IceInternal/UdpConnector.java
index f9c5b6a7a52..3fbad48782b 100644
--- a/java/src/IceInternal/UdpConnector.java
+++ b/java/src/IceInternal/UdpConnector.java
@@ -12,7 +12,7 @@ package IceInternal;
final class UdpConnector implements Connector, java.lang.Comparable
{
public Transceiver
- connect(int timeout)
+ connect()
{
return new UdpTransceiver(_instance, _addr, _mcastInterface, _mcastTtl);
}
diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java
index 62000f84b37..1410829c30c 100644
--- a/java/src/IceInternal/UdpTransceiver.java
+++ b/java/src/IceInternal/UdpTransceiver.java
@@ -19,7 +19,7 @@ final class UdpTransceiver implements Transceiver
}
public SocketStatus
- initialize(int timeout)
+ initialize()
{
//
// Nothing to do.
@@ -27,73 +27,29 @@ final class UdpTransceiver implements Transceiver
return SocketStatus.Finished;
}
- public synchronized void
+ public void
close()
{
- //
- // NOTE: closeSocket() may have already been invoked by shutdownReadWrite().
- //
- closeSocket();
-
- if(_readSelector != null)
- {
- try
- {
- _readSelector.close();
- }
- catch(java.io.IOException ex)
- {
- // Ignore.
- }
- _readSelector = null;
- }
-
- if(_writeSelector != null)
+ assert(_fd != null);
+
+ if(_traceLevels.network >= 1)
{
- try
- {
- _writeSelector.close();
- }
- catch(java.io.IOException ex)
- {
- // Ignore.
- }
- _writeSelector = null;
+ String s = "closing udp connection\n" + toString();
+ _logger.trace(_traceLevels.networkCat, s);
}
- }
-
- public void
- shutdownWrite()
- {
- //
- // NOTE: DatagramSocket does not support shutdownOutput.
- //
- }
-
- public synchronized void
- shutdownReadWrite()
- {
- //
- // NOTE: DatagramSocket does not support shutdownInput, and we
- // cannot use the C++ technique of sending a "wakeup" packet to
- // this socket because the Java implementation deadlocks when we
- // call disconnect() while receive() is in progress. Therefore
- // we close the socket here and wake up the selector.
- //
- closeSocket();
-
- if(_readSelector != null)
+
+ try
{
- _readSelector.wakeup();
+ _fd.close();
}
- if(_writeSelector != null)
+ catch(java.io.IOException ex)
{
- _writeSelector.wakeup();
}
+ _fd = null;
}
public boolean
- write(Buffer buf, int timeout)
+ write(Buffer buf)
{
assert(buf.b.position() == 0);
final int packetSize = java.lang.Math.min(_maxPacketSize, _sndSize - _udpOverhead);
@@ -115,41 +71,7 @@ final class UdpTransceiver implements Transceiver
if(ret == 0)
{
- if(timeout == 0)
- {
- return false;
- }
-
- synchronized(this)
- {
- if(_writeSelector == null)
- {
- _writeSelector = java.nio.channels.Selector.open();
- _fd.register(_writeSelector, java.nio.channels.SelectionKey.OP_WRITE, null);
- }
- }
-
- try
- {
- if(timeout > 0)
- {
- long start = IceInternal.Time.currentMonotonicTimeMillis();
- int n = _writeSelector.select(timeout);
- if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout)
- {
- throw new Ice.TimeoutException();
- }
- }
- else
- {
- _writeSelector.select();
- }
- }
- catch(java.io.InterruptedIOException ex)
- {
- // Ignore.
- }
- continue;
+ return false;
}
if(_traceLevels.network >= 3)
@@ -194,7 +116,7 @@ final class UdpTransceiver implements Transceiver
}
public boolean
- read(Buffer buf, int timeout, Ice.BooleanHolder moreData)
+ read(Buffer buf, Ice.BooleanHolder moreData)
{
assert(buf.b.position() == 0);
moreData.value = false;
@@ -218,60 +140,13 @@ final class UdpTransceiver implements Transceiver
int ret = 0;
while(true)
{
- //
- // Check for shutdown.
- //
- java.nio.channels.DatagramChannel fd = null;
- synchronized(this)
- {
- if(_fd == null)
- {
- throw new Ice.ConnectionLostException();
- }
- fd = _fd;
- }
-
try
{
- java.net.InetSocketAddress sender = (java.net.InetSocketAddress)fd.receive(buf.b);
+ java.net.InetSocketAddress sender = (java.net.InetSocketAddress)_fd.receive(buf.b);
if(sender == null || buf.b.position() == 0)
{
- if(timeout == 0)
- {
- return false;
- }
-
- synchronized(this)
- {
- if(_readSelector == null)
- {
- _readSelector = java.nio.channels.Selector.open();
- _fd.register(_readSelector, java.nio.channels.SelectionKey.OP_READ, null);
- }
- }
-
- try
- {
- if(timeout > 0)
- {
- long start = IceInternal.Time.currentMonotonicTimeMillis();
- int n = _readSelector.select(timeout);
- if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout)
- {
- throw new Ice.TimeoutException();
- }
- }
- else
- {
- _readSelector.select();
- }
- }
- catch(java.io.InterruptedIOException ex)
- {
- // Ignore.
- }
- continue;
+ return false;
}
ret = buf.b.position();
@@ -282,7 +157,7 @@ final class UdpTransceiver implements Transceiver
// If we must connect, then we connect to the first peer that
// sends us a packet.
//
- Network.doConnect(fd, sender, -1);
+ Network.doConnect(_fd, sender);
_connect = false; // We're connected now
if(_traceLevels.network >= 1)
@@ -400,7 +275,7 @@ final class UdpTransceiver implements Transceiver
_fd = Network.createUdpSocket();
setBufSize(instance);
Network.setBlock(_fd, false);
- Network.doConnect(_fd, _addr, -1);
+ Network.doConnect(_fd, _addr);
_connect = false; // We're connected now
if(_addr.getAddress().isMulticastAddress())
{
@@ -633,23 +508,6 @@ final class UdpTransceiver implements Transceiver
private void
closeSocket()
{
- if(_fd != null)
- {
- if(_traceLevels.network >= 1)
- {
- String s = "closing udp connection\n" + toString();
- _logger.trace(_traceLevels.networkCat, s);
- }
-
- try
- {
- _fd.close();
- }
- catch(java.io.IOException ex)
- {
- }
- _fd = null;
- }
}
protected synchronized void
@@ -671,8 +529,6 @@ final class UdpTransceiver implements Transceiver
private int _sndSize;
private java.nio.channels.DatagramChannel _fd;
private java.net.InetSocketAddress _addr;
- private java.nio.channels.Selector _readSelector;
- private java.nio.channels.Selector _writeSelector;
private boolean mcastServer = false;
//
diff --git a/java/src/IceInternal/UnknownEndpointI.java b/java/src/IceInternal/UnknownEndpointI.java
index 6e940af8ed7..63cff5902ab 100644
--- a/java/src/IceInternal/UnknownEndpointI.java
+++ b/java/src/IceInternal/UnknownEndpointI.java
@@ -360,12 +360,6 @@ final class UnknownEndpointI extends EndpointI
return 0;
}
- public boolean
- requiresThreadPerConnection()
- {
- return false;
- }
-
private void
calcHashValue()
{
diff --git a/java/src/IceSSL/AcceptorI.java b/java/src/IceSSL/AcceptorI.java
index de5be3781a3..aadf7316c50 100644
--- a/java/src/IceSSL/AcceptorI.java
+++ b/java/src/IceSSL/AcceptorI.java
@@ -26,30 +26,9 @@ final class AcceptorI implements IceInternal.Acceptor
_logger.trace(_instance.networkTraceCategory(), s);
}
- java.nio.channels.ServerSocketChannel fd;
- java.nio.channels.Selector selector;
- synchronized(this)
- {
- fd = _fd;
- selector = _selector;
- _fd = null;
- _selector = null;
- }
- if(fd != null)
- {
- IceInternal.Network.closeSocketNoThrow(fd);
- }
- if(selector != null)
- {
- try
- {
- selector.close();
- }
- catch(java.io.IOException ex)
- {
- // Ignore.
- }
- }
+ assert(_fd != null);
+ IceInternal.Network.closeSocketNoThrow(_fd);
+ _fd = null;
}
public void
@@ -65,7 +44,7 @@ final class AcceptorI implements IceInternal.Acceptor
}
public IceInternal.Transceiver
- accept(int timeout)
+ accept()
{
//
// The plugin may not be fully initialized.
@@ -83,52 +62,6 @@ final class AcceptorI implements IceInternal.Acceptor
try
{
fd = _fd.accept();
- if(fd == null)
- {
- if(_selector == null)
- {
- _selector = java.nio.channels.Selector.open();
- }
-
- while(true)
- {
- try
- {
- java.nio.channels.SelectionKey key =
- _fd.register(_selector, java.nio.channels.SelectionKey.OP_ACCEPT);
- if(timeout > 0)
- {
- if(_selector.select(timeout) == 0)
- {
- throw new Ice.TimeoutException();
- }
- }
- else if(timeout == 0)
- {
- if(_selector.selectNow() == 0)
- {
- throw new Ice.TimeoutException();
- }
- }
- else
- {
- _selector.select();
- }
-
- break;
- }
- catch(java.io.IOException ex)
- {
- if(IceInternal.Network.interrupted(ex))
- {
- continue;
- }
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
- }
}
catch(java.io.IOException ex)
{
@@ -178,15 +111,6 @@ final class AcceptorI implements IceInternal.Acceptor
return new TransceiverI(_instance, engine, fd, "", true, true, _adapterName);
}
- public void
- connectToSelf()
- {
- java.nio.channels.SocketChannel fd = IceInternal.Network.createTcpSocket();
- IceInternal.Network.setBlock(fd, false);
- IceInternal.Network.doConnect(fd, _addr, -1);
- IceInternal.Network.closeSocketNoThrow(fd);
- }
-
public String
toString()
{
@@ -258,5 +182,4 @@ final class AcceptorI implements IceInternal.Acceptor
private java.nio.channels.ServerSocketChannel _fd;
private int _backlog;
private java.net.InetSocketAddress _addr;
- private java.nio.channels.Selector _selector;
}
diff --git a/java/src/IceSSL/ConnectorI.java b/java/src/IceSSL/ConnectorI.java
index 91f170685b0..370237563e8 100644
--- a/java/src/IceSSL/ConnectorI.java
+++ b/java/src/IceSSL/ConnectorI.java
@@ -12,7 +12,7 @@ package IceSSL;
final class ConnectorI implements IceInternal.Connector, java.lang.Comparable
{
public IceInternal.Transceiver
- connect(int timeout)
+ connect()
{
//
// The plugin may not be fully initialized.
@@ -35,7 +35,7 @@ final class ConnectorI implements IceInternal.Connector, java.lang.Comparable
java.nio.channels.SocketChannel fd = IceInternal.Network.createTcpSocket();
IceInternal.Network.setBlock(fd, false);
IceInternal.Network.setTcpBufSize(fd, _instance.communicator().getProperties(), _logger);
- boolean connected = IceInternal.Network.doConnect(fd, _addr, timeout);
+ boolean connected = IceInternal.Network.doConnect(fd, _addr);
try
{
javax.net.ssl.SSLEngine engine = _instance.createSSLEngine(false);
diff --git a/java/src/IceSSL/TransceiverI.java b/java/src/IceSSL/TransceiverI.java
index eb983f4b59a..3ef8a5c63d7 100644
--- a/java/src/IceSSL/TransceiverI.java
+++ b/java/src/IceSSL/TransceiverI.java
@@ -22,42 +22,29 @@ final class TransceiverI implements IceInternal.Transceiver
return _fd;
}
- //
- // All methods that can write to the socket are synchronized.
- //
- public synchronized IceInternal.SocketStatus
- initialize(int timeout)
+ public IceInternal.SocketStatus
+ initialize()
{
try
{
- if(_state == StateNeedConnect && timeout == 0)
+ if(_state == StateNeedConnect)
{
_state = StateConnectPending;
return IceInternal.SocketStatus.NeedConnect;
}
else if(_state <= StateConnectPending)
{
- IceInternal.Network.doFinishConnect(_fd, timeout);
+ IceInternal.Network.doFinishConnect(_fd);
_state = StateConnected;
_desc = IceInternal.Network.fdToString(_fd);
}
assert(_state == StateConnected);
- IceInternal.SocketStatus status;
- do
+ IceInternal.SocketStatus status = handshakeNonBlocking();
+ if(status != IceInternal.SocketStatus.Finished)
{
- status = handshakeNonBlocking();
- if(timeout == 0)
- {
- return status;
- }
-
- if(status != IceInternal.SocketStatus.Finished)
- {
- handleSocketStatus(status, timeout);
- }
+ return status;
}
- while(status != IceInternal.SocketStatus.Finished);
}
catch(Ice.LocalException ex)
{
@@ -72,10 +59,6 @@ final class TransceiverI implements IceInternal.Transceiver
return IceInternal.SocketStatus.Finished;
}
- //
- // All methods that can write to the socket are synchronized.
- //
-
public void
close()
{
@@ -87,47 +70,60 @@ final class TransceiverI implements IceInternal.Transceiver
assert(_fd != null);
- if(_readSelector != null)
+ if(_state >= StateConnected)
{
try
{
- _readSelector.close();
+ //
+ // Send the close_notify message.
+ //
+ _engine.closeOutbound();
+ _netOutput.clear();
+ while(!_engine.isOutboundDone())
+ {
+ _engine.wrap(_emptyBuffer, _netOutput);
+ try
+ {
+ //
+ // Note: we can't block to send the close_notify message. In some cases, the
+ // close_notify message might therefore not be receieved by the peer. This is
+ // not a big issue since the Ice protocol isn't subject to truncation attacks.
+ //
+ flushNonBlocking();
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Ignore.
+ }
+ }
}
- catch(java.io.IOException ex)
+ catch(SSLException ex)
{
- // Ignore.
+ //
+ // We can't throw in close.
+ //
+ // Ice.SecurityException se = new Ice.SecurityException();
+ // se.reason = "IceSSL: SSL failure while shutting down socket";
+ // se.initCause(ex);
+ // throw se;
}
- _readSelector = null;
- }
- if(_writeSelector != null)
- {
try
{
- _writeSelector.close();
+ _engine.closeInbound();
}
- catch(java.io.IOException ex)
+ catch(SSLException ex)
{
- // Ignore.
+ //
+ // SSLEngine always raises an exception with this message:
+ //
+ // Inbound closed before receiving peer's close_notify: possible truncation attack?
+ //
+ // We would probably need to wait for a response in shutdown() to avoid this.
+ // For now, we'll ignore this exception.
+ //
+ //_logger.error("IceSSL: error during close\n" + ex.getMessage());
}
- _writeSelector = null;
- }
-
- try
- {
- _engine.closeInbound();
- }
- catch(SSLException ex)
- {
- //
- // SSLEngine always raises an exception with this message:
- //
- // Inbound closed before receiving peer's close_notify: possible truncation attack?
- //
- // We would probably need to wait for a response in shutdown() to avoid this.
- // For now, we'll ignore this exception.
- //
- //_logger.error("IceSSL: error during close\n" + ex.getMessage());
}
try
@@ -140,104 +136,8 @@ final class TransceiverI implements IceInternal.Transceiver
}
}
- //
- // All methods that can write to the socket are synchronized.
- //
- public synchronized void
- shutdownWrite()
- {
- if(_state < StateConnected)
- {
- return;
- }
-
- if(_instance.networkTraceLevel() >= 2)
- {
- String s = "shutting down ssl connection for writing\n" + toString();
- _logger.trace(_instance.networkTraceCategory(), s);
- }
-
- shutdown();
-
- assert(_fd != null);
- java.net.Socket socket = _fd.socket();
- try
- {
- socket.shutdownOutput(); // Shutdown socket for writing.
- }
- catch(java.net.SocketException ex)
- {
- //
- // Ignore. We can't reliably figure out if the socket
- // exception is because the socket is not connected.
- //
- // if(!IceInternal.Network.notConnected(ex))
- // {
- // Ice.SocketException se = new Ice.SocketException();
- // se.initCause(ex);
- // throw se;
- // }
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
-
- //
- // All methods that can write to the socket are synchronized.
- //
- public synchronized void
- shutdownReadWrite()
- {
- if(_state < StateConnected)
- {
- return;
- }
-
- if(_instance.networkTraceLevel() >= 2)
- {
- String s = "shutting down ssl connection for reading and writing\n" + toString();
- _logger.trace(_instance.networkTraceCategory(), s);
- }
-
- shutdown();
-
- assert(_fd != null);
- java.net.Socket socket = _fd.socket();
- try
- {
- socket.shutdownInput(); // Shutdown socket for reading
- socket.shutdownOutput(); // Shutdown socket for writing
- }
- catch(java.net.SocketException ex)
- {
- //
- // Ignore. We can't reliably figure out if the socket
- // exception is because the socket is not connected.
- //
- // if(!IceInternal.Network.notConnected(ex))
- // {
- // Ice.SocketException se = new Ice.SocketException();
- // se.initCause(ex);
- // throw se;
- // }
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
-
- //
- // All methods that can write to the socket are synchronized.
- //
- public synchronized boolean
- write(IceInternal.Buffer buf, int timeout)
+ public boolean
+ write(IceInternal.Buffer buf)
{
//
// If the handshake isn't completed yet, we shouldn't be writing.
@@ -247,28 +147,17 @@ final class TransceiverI implements IceInternal.Transceiver
throw new Ice.ConnectionLostException();
}
- IceInternal.SocketStatus status;
- do
+ IceInternal.SocketStatus status = writeNonBlocking(buf.b);
+ if(status != IceInternal.SocketStatus.Finished)
{
- status = writeNonBlocking(buf.b);
- if(status != IceInternal.SocketStatus.Finished)
- {
- if(timeout == 0)
- {
- assert(status == IceInternal.SocketStatus.NeedWrite);
- return false;
- }
-
- handleSocketStatus(status, timeout);
- }
- }
- while(status != IceInternal.SocketStatus.Finished);
-
+ assert(status == IceInternal.SocketStatus.NeedWrite);
+ return false;
+ }
return true;
}
public boolean
- read(IceInternal.Buffer buf, int timeout, Ice.BooleanHolder moreData)
+ read(IceInternal.Buffer buf, Ice.BooleanHolder moreData)
{
//
// If the handshake isn't completed yet, we shouldn't be reading (read can be
@@ -324,23 +213,13 @@ final class TransceiverI implements IceInternal.Transceiver
}
case BUFFER_UNDERFLOW:
{
- IceInternal.SocketStatus status;
- do
+ IceInternal.SocketStatus status = readNonBlocking();
+ if(status != IceInternal.SocketStatus.Finished)
{
- status = readNonBlocking();
- if(status != IceInternal.SocketStatus.Finished)
- {
- if(timeout == 0)
- {
- assert(status == IceInternal.SocketStatus.NeedRead);
- moreData.value = false;
- return false;
- }
-
- handleSocketStatus(status, timeout);
- }
- }
- while(status != IceInternal.SocketStatus.Finished);
+ assert(status == IceInternal.SocketStatus.NeedRead);
+ moreData.value = false;
+ return false;
+ }
continue;
}
case CLOSED:
@@ -639,50 +518,6 @@ final class TransceiverI implements IceInternal.Transceiver
private void
shutdown()
{
- //
- // Send the close_notify message.
- //
- _engine.closeOutbound();
- try
- {
- _netOutput.clear();
- while(!_engine.isOutboundDone())
- {
- _engine.wrap(_emptyBuffer, _netOutput);
- try
- {
- //
- // We can't block to send the close_notify message as this is called from
- // shutdownWrite and shutdownReadWrite which aren't suppose to block. In
- // some cases, the close_notify message might therefore not be receieved
- // by the peer. This is not a big issue since the Ice protocol isn't
- // subject to truncation attacks.
- //
-// IceInternal.SocketStatus status;
-// do
-// {
-// status = flushNonBlocking();
-// if(status != IceInternal.SocketStatus.Finished)
-// {
-// handleSocketStatus(status, -1); // TODO: Is waiting indefinitely really correct?
-// }
-// }
-// while(status != IceInternal.SocketStatus.Finished);
- flushNonBlocking();
- }
- catch(Ice.ConnectionLostException ex)
- {
- // Ignore.
- }
- }
- }
- catch(SSLException ex)
- {
- Ice.SecurityException se = new Ice.SecurityException();
- se.reason = "IceSSL: SSL failure while shutting down socket";
- se.initCause(ex);
- throw se;
- }
}
private IceInternal.SocketStatus
@@ -932,74 +767,6 @@ final class TransceiverI implements IceInternal.Transceiver
_appInput.compact();
}
- private void
- handleSocketStatus(IceInternal.SocketStatus status, int timeout)
- {
- assert(timeout != 0);
- try
- {
- java.nio.channels.Selector selector;
- if(status == IceInternal.SocketStatus.NeedRead)
- {
- if(_readSelector == null)
- {
- _readSelector = java.nio.channels.Selector.open();
- _fd.register(_readSelector, java.nio.channels.SelectionKey.OP_READ, null);
- }
- selector = _readSelector;
- }
- else
- {
- assert(status == IceInternal.SocketStatus.NeedWrite);
- if(_writeSelector == null)
- {
- _writeSelector = java.nio.channels.Selector.open();
- _fd.register(_writeSelector, java.nio.channels.SelectionKey.OP_WRITE, null);
- }
- selector = _writeSelector;
- }
-
- while(true)
- {
- try
- {
- if(timeout > 0)
- {
- long start = System.currentTimeMillis();
- int n = selector.select(timeout);
- if(n == 0 && System.currentTimeMillis() >= start + timeout)
- {
- throw new Ice.TimeoutException();
- }
- }
- else
- {
- selector.select();
- }
-
- break;
- }
- catch(java.io.InterruptedIOException ex)
- {
- // Ignore.
- }
- }
- }
- catch(java.io.IOException ex)
- {
- if(IceInternal.Network.connectionLost(ex))
- {
- Ice.ConnectionLostException se = new Ice.ConnectionLostException();
- se.initCause(ex);
- throw se;
- }
-
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
-
private Instance _instance;
private java.nio.channels.SocketChannel _fd;
private javax.net.ssl.SSLEngine _engine;
@@ -1015,8 +782,6 @@ final class TransceiverI implements IceInternal.Transceiver
private ByteBuffer _netInput; // Holds encrypted data read from the socket.
private ByteBuffer _netOutput; // Holds encrypted data to be written to the socket.
private static ByteBuffer _emptyBuffer = ByteBuffer.allocate(0); // Used during handshaking.
- private java.nio.channels.Selector _readSelector;
- private java.nio.channels.Selector _writeSelector;
private ConnectionInfo _info;
private static final int StateNeedConnect = 0;