summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
committerBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
commitb9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch)
tree183215e2dbeadfbc871b800ce09726e58af38b91 /java/src
parentadding compression cookbook demo (diff)
downloadice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/Application.java31
-rw-r--r--java/src/Ice/ConnectionI.java986
-rw-r--r--java/src/Ice/ObjectAdapterI.java5
-rw-r--r--java/src/IceInternal/BasicStream.java47
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java18
-rw-r--r--java/src/IceInternal/ConnectionMonitor.java23
-rw-r--r--java/src/IceInternal/EndpointHostResolver.java31
-rw-r--r--java/src/IceInternal/EventHandler.java65
-rw-r--r--java/src/IceInternal/Ex.java12
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java256
-rw-r--r--java/src/IceInternal/Instance.java40
-rw-r--r--java/src/IceInternal/LocalExceptionWrapper.java6
-rw-r--r--java/src/IceInternal/OutgoingAsyncMessageCallback.java14
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java35
-rw-r--r--java/src/IceInternal/PropertyNames.java15
-rw-r--r--java/src/IceInternal/Selector.java534
-rw-r--r--java/src/IceInternal/SelectorHandler.java31
-rw-r--r--java/src/IceInternal/SelectorThread.java307
-rw-r--r--java/src/IceInternal/ServantManager.java8
-rw-r--r--java/src/IceInternal/SocketOperation.java20
-rw-r--r--java/src/IceInternal/SocketStatus.java39
-rw-r--r--java/src/IceInternal/TcpTransceiver.java46
-rw-r--r--java/src/IceInternal/ThreadPool.java1128
-rw-r--r--java/src/IceInternal/ThreadPoolCurrent.java37
-rw-r--r--java/src/IceInternal/ThreadPoolWorkItem.java2
-rw-r--r--java/src/IceInternal/ThreadPoolWorkQueue.java182
-rw-r--r--java/src/IceInternal/Timer.java6
-rw-r--r--java/src/IceInternal/Transceiver.java12
-rw-r--r--java/src/IceInternal/UdpTransceiver.java6
-rw-r--r--java/src/IceSSL/TransceiverI.java60
30 files changed, 1639 insertions, 2363 deletions
diff --git a/java/src/Ice/Application.java b/java/src/Ice/Application.java
index 3bb0cb74205..01b42196f8c 100644
--- a/java/src/Ice/Application.java
+++ b/java/src/Ice/Application.java
@@ -106,12 +106,12 @@ public abstract class Application
}
catch(LocalException ex)
{
- error("", ex);
+ Util.getProcessLogger().error(IceInternal.Ex.toString(ex));
return 1;
}
catch(java.lang.Exception ex)
{
- error("unknown exception", ex);
+ Util.getProcessLogger().error("unknown exception: " + IceInternal.Ex.toString(ex));
return 1;
}
}
@@ -195,12 +195,12 @@ public abstract class Application
}
catch(LocalException ex)
{
- error("", ex);
+ Util.getProcessLogger().error(IceInternal.Ex.toString(ex));
status = 1;
}
catch(java.lang.Exception ex)
{
- error("unknown exception", ex);
+ Util.getProcessLogger().error("unknown exception: " + IceInternal.Ex.toString(ex));
status = 1;
}
catch(java.lang.Error err)
@@ -208,7 +208,7 @@ public abstract class Application
//
// We catch Error to avoid hangs in some non-fatal situations
//
- error("Java error", err);
+ Util.getProcessLogger().error("Java error: " + IceInternal.Ex.toString(err));
status = 1;
}
@@ -254,12 +254,12 @@ public abstract class Application
}
catch(LocalException ex)
{
- error("", ex);
+ Util.getProcessLogger().error(IceInternal.Ex.toString(ex));
status = 1;
}
catch(java.lang.Exception ex)
{
- error("unknown exception", ex);
+ Util.getProcessLogger().error("unknown exception: " + IceInternal.Ex.toString(ex));
status = 1;
}
_communicator = null;
@@ -642,23 +642,6 @@ public abstract class Application
private Thread _hook;
}
- private void
- error(String msg, java.lang.Throwable ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- if(msg.equals(""))
- {
- Util.getProcessLogger().error(sw.toString());
- }
- else
- {
- Util.getProcessLogger().error(msg + ":\n" + sw.toString());
- }
- }
-
private static String _appName;
private static Communicator _communicator;
private static AppHook _appHook;
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 4107621f428..d00ce4499a6 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -17,6 +17,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
void connectionStartFailed(ConnectionI connection, Ice.LocalException ex);
}
+ private class TimeoutCallback implements IceInternal.TimerTask
+ {
+ public void
+ runTimerTask()
+ {
+ timedOut();
+ }
+ }
+
public void
start(StartCallback callback)
{
@@ -24,44 +33,23 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
synchronized(this)
{
- if(_state == StateClosed) // The connection might already be closed if the communicator was destroyed.
+ if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed.
{
assert(_exception != null);
throw _exception;
}
- IceInternal.SocketStatus status = initialize();
- if(status == IceInternal.SocketStatus.Finished)
+ if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
{
- status = validate();
- }
-
- if(status != IceInternal.SocketStatus.Finished)
- {
- //
- // If the initialization or validation couldn't be completed without potentially
- // blocking, we register the connection with the selector thread and return.
- //
- int timeout;
- IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideConnectTimeout)
- {
- timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
- }
- else
- {
- timeout = _endpoint.timeout();
- }
-
- _sendInProgress = true;
- _selectorThread._register(_socketReadyCallback, status, timeout);
-
if(callback != null)
{
_startCallback = callback;
return;
}
+ //
+ // Wait for the connection to be validated.
+ //
while(_state <= StateNotValidated)
{
try
@@ -79,6 +67,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
throw _exception;
}
}
+
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
}
}
catch(Ice.LocalException ex)
@@ -192,12 +185,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public synchronized boolean
isFinished()
{
- if(_transceiver != null || _dispatchCount != 0)
+ if(_state != StateFinished || _dispatchCount != 0)
{
return false;
}
- assert(_state == StateClosed);
+ assert(_state == StateFinished);
return true;
}
@@ -251,7 +244,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// Now we must wait until close() has been called on the
// transceiver.
//
- while(_transceiver != null)
+ while(_state != StateFinished)
{
try
{
@@ -296,7 +289,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- assert(_state == StateClosed);
+ assert(_state == StateFinished);
//
// Clear the OA. See bug 1673 for the details of why this is necessary.
@@ -316,9 +309,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// Active connection management for idle connections.
//
if(_acmTimeout <= 0 ||
- !_requests.isEmpty() || !_asyncRequests.isEmpty() ||
- _batchStreamInUse || !_batchStream.isEmpty() ||
- _sendInProgress || _dispatchCount > 0)
+ !_requests.isEmpty() || !_asyncRequests.isEmpty() || _dispatchCount > 0 ||
+ _readStream.size() > IceInternal.Protocol.headerSize || !_writeStream.isEmpty() || !_batchStream.isEmpty())
{
return;
}
@@ -786,7 +778,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
notifyAll();
}
- if(_state == StateClosed)
+ if(_state >= StateClosed)
{
assert(_exception != null);
throw _exception;
@@ -798,11 +790,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
initiateShutdown();
}
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
- }
}
catch(LocalException ex)
{
@@ -821,7 +808,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
notifyAll();
}
- if(_state == StateClosed)
+ if(_state >= StateClosed)
{
assert(_exception != null);
throw _exception;
@@ -831,11 +818,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
initiateShutdown();
}
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
- }
}
catch(LocalException ex)
{
@@ -897,103 +879,269 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
//
- // Operations from SelectorHandler
+ // Operations from EventHandler
//
- public java.nio.channels.SelectableChannel
- fd()
+ public void
+ message(IceInternal.ThreadPoolCurrent current)
{
- return _transceiver.fd();
- }
+ StartCallback startCB = null;
+ java.util.List<OutgoingMessage> sentCBs = null;
+ MessageInfo info = null;
- public boolean
- hasMoreData()
- {
- return _hasMoreData.value;
- }
+ synchronized(this)
+ {
+ if(_state >= StateClosed)
+ {
+ return;
+ }
- //
- // Operations from EventHandler
- //
+ try
+ {
+ unscheduleTimeout(current.operation);
+ if((current.operation & IceInternal.SocketOperation.Write) != 0 && !_writeStream.isEmpty())
+ {
+ if(!_transceiver.write(_writeStream.getBuffer()))
+ {
+ assert(!_writeStream.isEmpty());
+ scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout());
+ return;
+ }
+ assert(!_writeStream.getBuffer().b.hasRemaining());
+ }
+ if((current.operation & IceInternal.SocketOperation.Read) != 0 && !_readStream.isEmpty())
+ {
+ if(_readStream.size() == IceInternal.Protocol.headerSize) // Read header.
+ {
+ if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData))
+ {
+ return;
+ }
+ assert(!_readStream.getBuffer().b.hasRemaining());
- public boolean
- datagram()
- {
- return _endpoint.datagram(); // No mutex protection necessary, _endpoint is immutable.
- }
+ int pos = _readStream.pos();
+ if(pos < IceInternal.Protocol.headerSize)
+ {
+ //
+ // This situation is possible for small UDP packets.
+ //
+ throw new Ice.IllegalMessageSizeException();
+ }
- public boolean
- readable()
- {
- return true;
- }
+ _readStream.pos(0);
+ byte[] m = new byte[4];
+ m[0] = _readStream.readByte();
+ m[1] = _readStream.readByte();
+ m[2] = _readStream.readByte();
+ m[3] = _readStream.readByte();
+ if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1]
+ || m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
+ {
+ Ice.BadMagicException ex = new Ice.BadMagicException();
+ ex.badMagic = m;
+ throw ex;
+ }
- public boolean
- read(IceInternal.BasicStream stream)
- {
- assert(_transceiver != null);
- return _transceiver.read(stream.getBuffer(), _hasMoreData);
+ byte pMajor = _readStream.readByte();
+ byte pMinor = _readStream.readByte();
+ if(pMajor != IceInternal.Protocol.protocolMajor || pMinor > IceInternal.Protocol.protocolMinor)
+ {
+ Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException();
+ e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
+ e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
+ e.major = IceInternal.Protocol.protocolMajor;
+ e.minor = IceInternal.Protocol.protocolMinor;
+ throw e;
+ }
- //
- // Updating _acmAbsoluteTimeoutMillis is too expensive here,
- // because we would have to acquire a lock just for this
- // purpose. Instead, we update _acmAbsoluteTimeoutMillis in
- // message().
- //
- }
+ byte eMajor = _readStream.readByte();
+ byte eMinor = _readStream.readByte();
+ if(eMajor != IceInternal.Protocol.encodingMajor || eMinor > IceInternal.Protocol.encodingMinor)
+ {
+ Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException();
+ e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
+ e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
+ e.major = IceInternal.Protocol.encodingMajor;
+ e.minor = IceInternal.Protocol.encodingMinor;
+ throw e;
+ }
- public void
- message(IceInternal.BasicStream stream, IceInternal.ThreadPool threadPool)
- {
- MessageInfo info = new MessageInfo(stream);
+ _readStream.readByte(); // messageType
+ _readStream.readByte(); // compress
+ int size = _readStream.readInt();
+ if(size < IceInternal.Protocol.headerSize)
+ {
+ throw new Ice.IllegalMessageSizeException();
+ }
+ if(size > _instance.messageSizeMax())
+ {
+ IceInternal.Ex.throwMemoryLimitException(size, _instance.messageSizeMax());
+ }
+ if(size > _readStream.size())
+ {
+ _readStream.resize(size, true);
+ }
+ _readStream.pos(pos);
+ }
- synchronized(this)
- {
- //
- // We must promote within the synchronization, otherwise
- // there could be various race conditions with close
- // connection messages and other messages.
- //
- threadPool.promoteFollower(this);
+ if(_readStream.pos() != _readStream.size())
+ {
+ if(_endpoint.datagram())
+ {
+ if(_warnUdp)
+ {
+ _logger.warning("DatagramLimitException: maximum size of " + _readStream.pos() +
+ " exceeded");
+ }
+ throw new Ice.DatagramLimitException();
+ }
+ else
+ {
+ if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData))
+ {
+ assert(!_readStream.isEmpty());
+ scheduleTimeout(IceInternal.SocketOperation.Read, _endpoint.timeout());
+ return;
+ }
+ assert(!_readStream.getBuffer().b.hasRemaining());
+ }
+ }
+ }
+
+ if(_state <= StateNotValidated)
+ {
+ if(_state == StateNotInitialized && !initialize(current.operation))
+ {
+ return;
+ }
+
+ if(_state <= StateNotValidated && !validate(current.operation))
+ {
+ return;
+ }
+
+ _threadPool.unregister(this, current.operation);
- if(_state != StateClosed)
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
+ startCB = _startCallback;
+ _startCallback = null;
+ }
+ else
+ {
+ assert(_state <= StateClosing);
+
+ if((current.operation & IceInternal.SocketOperation.Write) != 0)
+ {
+ sentCBs = sendNextMessage();
+ }
+
+ if((current.operation & IceInternal.SocketOperation.Read) != 0)
+ {
+ info = parseMessage(current.stream);
+ }
+ }
+ }
+ catch(DatagramLimitException ex) // Expected.
{
- parseMessage(info);
+ _readStream.resize(IceInternal.Protocol.headerSize, true);
+ _readStream.pos(0);
+ return;
}
-
- //
- // parseMessage() can close the connection, so we must check
- // for closed state again.
- //
- if(_state == StateClosed)
+ catch(SocketException ex)
{
+ setState(StateClosed, ex);
return;
}
+ catch(LocalException ex)
+ {
+ if(_endpoint.datagram())
+ {
+ if(_warn)
+ {
+ String s = "datagram connection exception:\n" + ex + '\n' + _desc;
+ _logger.warning(s);
+ }
+ _readStream.resize(IceInternal.Protocol.headerSize, true);
+ _readStream.pos(0);
+ }
+ else
+ {
+ setState(StateClosed, ex);
+ }
+ return;
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
+ }
+
+ current.ioCompleted();
}
//
- // Asynchronous replies must be handled outside the thread
- // synchronization, so that nested calls are possible.
+ // Notify the factory that the connection establishment and
+ // validation has completed.
//
- if(info.outAsync != null)
+ if(startCB != null)
{
- info.outAsync.__finished(info.stream);
+ startCB.connectionStartCompleted(this);
}
//
- // Method invocation (or multiple invocations for batch messages)
- // must be done outside the thread synchronization, so that nested
- // calls are possible.
+ // Notify AMI calls that the message was sent.
//
- invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter);
+ if(sentCBs != null)
+ {
+ for(OutgoingMessage msg : sentCBs)
+ {
+ msg.outAsync.__sent(_instance);
+ }
+ }
+
+ if(info != null)
+ {
+ //
+ // Asynchronous replies must be handled outside the thread
+ // synchronization, so that nested calls are possible.
+ //
+ if(info.outAsync != null)
+ {
+ info.outAsync.__finished(info.stream);
+ }
+
+ if(info.invokeNum > 0)
+ {
+ //
+ // Method invocation (or multiple invocations for batch messages)
+ // must be done outside the thread synchronization, so that nested
+ // calls are possible.
+ //
+ invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
+ info.adapter);
+ }
+ }
}
public void
- finished(IceInternal.ThreadPool threadPool)
+ finished(IceInternal.ThreadPoolCurrent current)
{
synchronized(this)
{
- assert(threadPool == _threadPool && _state == StateClosed && !_sendInProgress);
- threadPool.promoteFollower(null);
+ assert(_state == StateClosed);
+ unscheduleTimeout(IceInternal.SocketOperation.Read | IceInternal.SocketOperation.Write);
+ }
+
+ //
+ // If there are no callbacks to call, we don't call ioCompleted() since we're not going
+ // to call code that will potentially block (this avoids promoting a new leader and
+ // unecessary thread creation, especially if this is called on shutdown).
+ //
+ if(_startCallback != null || !_sendStreams.isEmpty() || !_asyncRequests.isEmpty())
+ {
+ current.ioCompleted();
}
if(_startCallback != null)
@@ -1026,43 +1174,38 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
synchronized(this)
{
- try
- {
- _transceiver.close();
- }
- finally
- {
- _transceiver = null;
- notifyAll();
- }
+ setState(StateFinished);
}
}
- public synchronized void
- exception(LocalException ex)
+ public String
+ toString()
{
- setState(StateClosed, ex);
+ return _toString();
}
- public synchronized void
- invokeException(LocalException ex, int invokeNum)
+ public java.nio.channels.SelectableChannel
+ fd()
{
- //
- // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
- // called in case of a fatal exception we decrement _dispatchCount here.
- //
+ return _transceiver.fd();
+ }
- setState(StateClosed, ex);
+ public boolean
+ hasMoreData()
+ {
+ return _hasMoreData.value;
+ }
- if(invokeNum > 0)
+ public synchronized void
+ timedOut()
+ {
+ if(_state <= StateNotValidated)
{
- assert(_dispatchCount > 0);
- _dispatchCount -= invokeNum;
- assert(_dispatchCount >= 0);
- if(_dispatchCount == 0)
- {
- notifyAll();
- }
+ setState(StateClosed, new ConnectTimeoutException());
+ }
+ else if(_state <= StateClosing)
+ {
+ setState(StateClosed, new TimeoutException());
}
}
@@ -1078,10 +1221,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return _endpoint.timeout(); // No mutex protection necessary, _endpoint is immutable.
}
- public String
- toString()
+ //
+ // Only used by the SSL plug-in.
+ //
+ // The external party has to synchronize the connection, since the
+ // connection is the object that protects the transceiver.
+ //
+ public IceInternal.Transceiver
+ getTransceiver()
{
- return _toString();
+ return _transceiver;
}
public String
@@ -1090,121 +1239,38 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return _desc; // No mutex lock, _desc is immutable.
}
- //
- // Operations from SocketReadyCallback
- //
- public IceInternal.SocketStatus
- socketReady()
- {
- StartCallback callback = null;
-
- synchronized(this)
- {
- assert(_sendInProgress);
-
- if(_state == StateClosed)
- {
- return IceInternal.SocketStatus.Finished;
- }
-
- try
- {
- //
- // First, we check if there's something to send. If that's the case, the connection
- // must be active and the only thing to do is send the queued streams.
- //
- if(!_sendStreams.isEmpty())
- {
- if(!send())
- {
- return IceInternal.SocketStatus.NeedWrite;
- }
- assert(_sendStreams.isEmpty());
- }
- else
- {
- if(_state == StateNotInitialized)
- {
- IceInternal.SocketStatus status = initialize();
- if(status != IceInternal.SocketStatus.Finished)
- {
- return status;
- }
- }
-
- if(_state <= StateNotValidated)
- {
- IceInternal.SocketStatus status = validate();
- if(status != IceInternal.SocketStatus.Finished)
- {
- return status;
- }
- }
-
- callback = _startCallback;
- _startCallback = null;
- }
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- return IceInternal.SocketStatus.Finished;
- }
-
- assert(_sendStreams.isEmpty());
- _selectorThread.unregister(_socketReadyCallback);
- _sendInProgress = false;
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
- }
- }
-
- if(callback != null)
- {
- callback.connectionStartCompleted(this);
- }
- return IceInternal.SocketStatus.Finished;
- }
-
public synchronized void
- socketFinished()
+ exception(LocalException ex)
{
- assert(_sendInProgress && _state == StateClosed);
- _sendInProgress = false;
- _threadPool.finish(this);
+ setState(StateClosed, ex);
}
public synchronized void
- socketTimeout()
+ invokeException(LocalException ex, int invokeNum)
{
- if(_state <= StateNotValidated)
- {
- setState(StateClosed, new ConnectTimeoutException());
- }
- else if(_state <= StateClosing)
+ //
+ // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
+ // called in case of a fatal exception we decrement _dispatchCount here.
+ //
+
+ setState(StateClosed, ex);
+
+ if(invokeNum > 0)
{
- setState(StateClosed, new TimeoutException());
+ assert(_dispatchCount > 0);
+ _dispatchCount -= invokeNum;
+ assert(_dispatchCount >= 0);
+ if(_dispatchCount == 0)
+ {
+ notifyAll();
+ }
}
}
- //
- // Only used by the SSL plug-in.
- //
- // The external party has to synchronize the connection, since the
- // connection is the object that protects the transceiver.
- //
- public IceInternal.Transceiver
- getTransceiver()
- {
- return _transceiver;
- }
-
public ConnectionI(IceInternal.Instance instance, IceInternal.Transceiver transceiver,
IceInternal.EndpointI endpoint, ObjectAdapter adapter)
{
- super(instance);
-
+ _instance = instance;
final Ice.InitializationData initData = instance.initializationData();
_transceiver = transceiver;
_desc = transceiver.toString();
@@ -1213,7 +1279,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_adapter = adapter;
_logger = initData.logger; // Cached for better performance.
_traceLevels = instance.traceLevels(); // Cached for better performance.
- _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
+ _timer = instance.timer();
+ _writeTimeout = new TimeoutCallback();
+ _writeTimeoutScheduled = false;
+ _readTimeout = new TimeoutCallback();
+ _readTimeoutScheduled = false;
+ _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0;
+ _warnUdp = instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
_cacheBuffers = initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 1) == 1;
_acmAbsoluteTimeoutMillis = 0;
_nextRequestId = 1;
@@ -1223,7 +1295,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_batchRequestNum = 0;
_batchRequestCompress = false;
_batchMarker = 0;
- _sendInProgress = false;
+ _readStream = new IceInternal.BasicStream(instance);
+ _writeStream = new IceInternal.BasicStream(instance);
_dispatchCount = 0;
_state = StateNotInitialized;
_stateTime = IceInternal.Time.currentMonotonicTimeMillis();
@@ -1274,11 +1347,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
_threadPool = _instance.clientThreadPool();
}
-
- _selectorThread = _instance.selectorThread();
-
- _overrideCompress = _instance.defaultsAndOverrides().overrideCompress;
- _overrideCompressValue = _instance.defaultsAndOverrides().overrideCompressValue;
+ _threadPool.initialize(this);
}
catch(Ice.LocalException ex)
{
@@ -1297,8 +1366,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
throws Throwable
{
IceUtilInternal.Assert.FinalizerAssert(_startCallback == null);
- IceUtilInternal.Assert.FinalizerAssert(_state == StateClosed);
- IceUtilInternal.Assert.FinalizerAssert(_transceiver == null);
+ IceUtilInternal.Assert.FinalizerAssert(_state == StateFinished);
IceUtilInternal.Assert.FinalizerAssert(_dispatchCount == 0);
IceUtilInternal.Assert.FinalizerAssert(_sendStreams.isEmpty());
IceUtilInternal.Assert.FinalizerAssert(_requests.isEmpty());
@@ -1313,6 +1381,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private static final int StateHolding = 3;
private static final int StateClosing = 4;
private static final int StateClosed = 5;
+ private static final int StateFinished = 6;
private void
setState(int state, LocalException ex)
@@ -1321,7 +1390,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// If setState() is called with an exception, then only closed
// and closing states are permissible.
//
- assert(state == StateClosing || state == StateClosed);
+ assert(state >= StateClosing);
if(_state == state) // Don't switch twice.
{
@@ -1388,8 +1457,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return;
}
- switch(state)
+ try
{
+ switch(state)
+ {
case StateNotInitialized:
{
assert(false);
@@ -1416,7 +1487,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
return;
}
- _threadPool._register(this);
+ _threadPool.register(this, IceInternal.SocketOperation.Read);
break;
}
@@ -1430,7 +1501,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
return;
}
- _threadPool.unregister(this);
+ if(_state == StateActive)
+ {
+ _threadPool.unregister(this, IceInternal.SocketOperation.Read);
+ }
break;
}
@@ -1439,34 +1513,44 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// Can't change back from closed.
//
- if(_state == StateClosed)
+ if(_state >= StateClosed)
{
return;
}
- _threadPool._register(this);
+ if(_state == StateHolding)
+ {
+ // We need to continue to read in closing state.
+ _threadPool.register(this, IceInternal.SocketOperation.Read);
+ }
break;
}
case StateClosed:
{
- if(_sendInProgress)
+ if(_state == StateFinished)
{
- //
- // Unregister with both the pool and the selector thread. We unregister with
- // the pool to ensure that it stops reading on the socket (otherwise, if the
- // socket is closed the thread pool would spin always reading 0 from the FD).
- // The selector thread will register again the FD with the pool once it's
- // done.
- //
- _selectorThread.finish(_socketReadyCallback);
- _threadPool.unregister(this);
- }
- else
- {
- _threadPool.finish(this);
+ return;
}
+ _threadPool.finish(this);
+ break;
+ }
+
+ case StateFinished:
+ {
+ assert(_state == StateClosed);
+ _transceiver.close();
break;
}
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "unexpected connection exception:\n " + _desc + "\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
}
//
@@ -1542,13 +1626,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- private IceInternal.SocketStatus
- initialize()
+ private boolean
+ initialize(int operation)
{
- IceInternal.SocketStatus status = _transceiver.initialize();
- if(status != IceInternal.SocketStatus.Finished)
+ int s = _transceiver.initialize();
+ if(s != IceInternal.SocketOperation.None)
{
- return status;
+ scheduleTimeout(s, connectTimeout());
+ _threadPool.update(this, operation, s);
+ return false;
}
//
@@ -1556,53 +1642,55 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
_desc = _transceiver.toString();
setState(StateNotValidated);
- return IceInternal.SocketStatus.Finished;
+ return true;
}
- private IceInternal.SocketStatus
- validate()
+ private boolean
+ validate(int operation)
{
if(!_endpoint.datagram()) // Datagram connections are always implicitly validated.
{
if(_adapter != null) // The server side has the active role for connection validation.
{
- IceInternal.BasicStream os = _stream;
- if(os.size() == 0)
+ if(_writeStream.size() == 0)
{
- os.writeBlob(IceInternal.Protocol.magic);
- os.writeByte(IceInternal.Protocol.protocolMajor);
- os.writeByte(IceInternal.Protocol.protocolMinor);
- os.writeByte(IceInternal.Protocol.encodingMajor);
- os.writeByte(IceInternal.Protocol.encodingMinor);
- os.writeByte(IceInternal.Protocol.validateConnectionMsg);
- os.writeByte((byte)0); // Compression status (always zero for validate connection).
- os.writeInt(IceInternal.Protocol.headerSize); // Message size.
- IceInternal.TraceUtil.traceSend(os, _logger, _traceLevels);
- os.prepareWrite();
+ _writeStream.writeBlob(IceInternal.Protocol.magic);
+ _writeStream.writeByte(IceInternal.Protocol.protocolMajor);
+ _writeStream.writeByte(IceInternal.Protocol.protocolMinor);
+ _writeStream.writeByte(IceInternal.Protocol.encodingMajor);
+ _writeStream.writeByte(IceInternal.Protocol.encodingMinor);
+ _writeStream.writeByte(IceInternal.Protocol.validateConnectionMsg);
+ _writeStream.writeByte((byte)0); // Compression status (always zero for validate connection).
+ _writeStream.writeInt(IceInternal.Protocol.headerSize); // Message size.
+ IceInternal.TraceUtil.traceSend(_writeStream, _logger, _traceLevels);
+ _writeStream.prepareWrite();
}
- if(!_transceiver.write(os.getBuffer()))
+ if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer()))
{
- return IceInternal.SocketStatus.NeedWrite;
+ scheduleTimeout(IceInternal.SocketOperation.Write, connectTimeout());
+ _threadPool.update(this, operation, IceInternal.SocketOperation.Write);
+ return false;
}
}
else // The client side has the passive role for connection validation.
{
- IceInternal.BasicStream is = _stream;
- if(is.size() == 0)
+ if(_readStream.size() == 0)
{
- is.resize(IceInternal.Protocol.headerSize, true);
- is.pos(0);
+ _readStream.resize(IceInternal.Protocol.headerSize, true);
+ _readStream.pos(0);
}
- if(!_transceiver.read(is.getBuffer(), _hasMoreData))
+ if(_readStream.pos() != _readStream.size() && !_transceiver.read(_readStream.getBuffer(), _hasMoreData))
{
- return IceInternal.SocketStatus.NeedRead;
+ scheduleTimeout(IceInternal.SocketOperation.Read, connectTimeout());
+ _threadPool.update(this, operation, IceInternal.SocketOperation.Read);
+ return false;
}
- assert(is.pos() == IceInternal.Protocol.headerSize);
- is.pos(0);
- byte[] m = is.readBlob(4);
+ assert(_readStream.pos() == IceInternal.Protocol.headerSize);
+ _readStream.pos(0);
+ byte[] m = _readStream.readBlob(4);
if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
{
@@ -1610,8 +1698,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
ex.badMagic = m;
throw ex;
}
- byte pMajor = is.readByte();
- byte pMinor = is.readByte();
+ byte pMajor = _readStream.readByte();
+ byte pMinor = _readStream.readByte();
if(pMajor != IceInternal.Protocol.protocolMajor)
{
UnsupportedProtocolException e = new UnsupportedProtocolException();
@@ -1621,8 +1709,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
e.minor = IceInternal.Protocol.protocolMinor;
throw e;
}
- byte eMajor = is.readByte();
- byte eMinor = is.readByte();
+ byte eMajor = _readStream.readByte();
+ byte eMinor = _readStream.readByte();
if(eMajor != IceInternal.Protocol.encodingMajor)
{
UnsupportedEncodingException e = new UnsupportedEncodingException();
@@ -1632,116 +1720,115 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
e.minor = IceInternal.Protocol.encodingMinor;
throw e;
}
- byte messageType = is.readByte();
+ byte messageType = _readStream.readByte();
if(messageType != IceInternal.Protocol.validateConnectionMsg)
{
throw new ConnectionNotValidatedException();
}
- is.readByte(); // Ignore compression status for validate connection.
- int size = is.readInt();
+ _readStream.readByte(); // Ignore compression status for validate connection.
+ int size = _readStream.readInt();
if(size != IceInternal.Protocol.headerSize)
{
throw new IllegalMessageSizeException();
}
- IceInternal.TraceUtil.traceRecv(is, _logger, _traceLevels);
+ IceInternal.TraceUtil.traceRecv(_readStream, _logger, _traceLevels);
}
}
- _stream.reset();
+ _writeStream.resize(0, false);
+ _writeStream.pos(0);
- //
- // We start out in holding state.
- //
- setState(StateHolding);
- return IceInternal.SocketStatus.Finished;
+ _readStream.resize(IceInternal.Protocol.headerSize, true);
+ _readStream.pos(0);
+
+ return true;
}
- private boolean
- send()
+ private java.util.List<OutgoingMessage>
+ sendNextMessage()
{
- assert(_transceiver != null);
assert(!_sendStreams.isEmpty());
+ assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
- boolean flushSentCallbacks = _sentCallbacks.isEmpty();
+ java.util.List<OutgoingMessage> callbacks = new java.util.LinkedList<OutgoingMessage>();
try
{
- while(!_sendStreams.isEmpty())
+ while(true)
{
+ //
+ // Notify the message that it was sent.
+ //
OutgoingMessage message = _sendStreams.getFirst();
- if(!message.prepared)
+ _writeStream.swap(message.stream);
+ message.sent(this, true);
+ if(message.outAsync instanceof Ice.AMISentCallback)
{
- IceInternal.BasicStream stream = message.stream;
-
- boolean compress = _overrideCompress ? _overrideCompressValue : message.compress;
- message.stream = doCompress(stream, compress);
- message.stream.prepareWrite();
- message.prepared = true;
-
- if(message.outAsync != null)
- {
- IceInternal.TraceUtil.trace("sending asynchronous request", stream, _logger, _traceLevels);
- }
- else
- {
- IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels);
- }
-
+ callbacks.add(message);
}
+ _sendStreams.removeFirst();
- if(!_transceiver.write(message.stream.getBuffer()))
+ //
+ // If there's nothing left to send, we're done.
+ //
+ if(_sendStreams.isEmpty())
{
- return false;
+ break;
}
+
+ //
+ // Otherwise, prepare the next message stream for writing.
+ //
+ message = _sendStreams.getFirst();
+ assert(!message.prepared);
+ IceInternal.BasicStream stream = message.stream;
- message.sent(this, true);
+ message.stream = doCompress(stream, message.compress);
+ message.stream.prepareWrite();
+ message.prepared = true;
- if(message.outAsync instanceof Ice.AMISentCallback)
+ if(message.outAsync != null)
+ {
+ IceInternal.TraceUtil.trace("sending asynchronous request", stream, _logger, _traceLevels);
+ }
+ else
{
- _sentCallbacks.add(message);
+ IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels);
}
+ _writeStream.swap(message.stream);
- _sendStreams.removeFirst();
+ //
+ // Send the message.
+ //
+ if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer()))
+ {
+ assert(!_writeStream.isEmpty());
+ scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout());
+ return callbacks;
+ }
}
}
- finally
+ catch(Ice.LocalException ex)
{
- if(flushSentCallbacks && !_sentCallbacks.isEmpty())
- {
- _threadPool.execute(_flushSentCallbacks);
- }
+ setState(StateClosed, ex);
+ return callbacks;
}
- return true;
- }
- private void
- flushSentCallbacks()
- {
- java.util.List<OutgoingMessage> sentCallbacks;
- synchronized(this)
- {
- assert(_sentCallbacks != null && !_sentCallbacks.isEmpty());
- sentCallbacks = _sentCallbacks;
- _sentCallbacks = new java.util.LinkedList<OutgoingMessage>();
- }
- for(OutgoingMessage message : sentCallbacks)
- {
- message.outAsync.__sent(_instance);
- }
+ assert(_writeStream.isEmpty());
+ _threadPool.unregister(this, IceInternal.SocketOperation.Write);
+ return callbacks;
}
private boolean
sendMessage(OutgoingMessage message)
{
- assert(_state != StateClosed);
- if(_sendInProgress)
+ assert(_state < StateClosed);
+ if(!_sendStreams.isEmpty())
{
message.adopt();
_sendStreams.addLast(message);
return false;
}
- assert(!_sendInProgress);
-
//
// Attempt to send the message without blocking. If the send blocks, we register
// the connection with the selector thread or we request the caller to call
@@ -1752,8 +1839,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceInternal.BasicStream stream = message.stream;
- boolean compress = _overrideCompress ? _overrideCompressValue : message.compress;
- message.stream = doCompress(stream, compress);
+ message.stream = doCompress(stream, message.compress);
message.stream.prepareWrite();
message.prepared = true;
@@ -1775,11 +1861,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
return true;
}
+ message.adopt();
+ _writeStream.swap(message.stream);
_sendStreams.addLast(message);
- _sendInProgress = true;
- message.adopt();
- _selectorThread._register(_socketReadyCallback, IceInternal.SocketStatus.NeedWrite, _endpoint.timeout());
+ scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout());
+ _threadPool.register(this, IceInternal.SocketOperation.Write);
return false;
}
@@ -1850,15 +1937,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceInternal.OutgoingAsync outAsync;
}
- private void
- parseMessage(MessageInfo info)
+ private MessageInfo
+ parseMessage(IceInternal.BasicStream stream)
{
assert(_state > StateNotValidated && _state < StateClosed);
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
- }
+ MessageInfo info = new MessageInfo(stream);
+
+ _readStream.swap(info.stream);
+ _readStream.resize(IceInternal.Protocol.headerSize, true);
+ _readStream.pos(0);
+
+ assert(info.stream.pos() == info.stream.size());
try
{
@@ -1874,11 +1964,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_compressionSupported)
{
- IceInternal.BasicStream ustream = info.stream.uncompress(IceInternal.Protocol.headerSize);
- if(ustream != info.stream)
- {
- info.stream = ustream;
- }
+ info.stream = info.stream.uncompress(IceInternal.Protocol.headerSize);
}
else
{
@@ -1998,7 +2084,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_warn)
{
- _logger.warning("udp connection exception:\n" + ex + _desc);
+ _logger.warning("datagram connection exception:\n" + ex + '\n' + _desc);
}
}
else
@@ -2006,6 +2092,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
setState(StateClosed, ex);
}
}
+
+ return info;
}
private void
@@ -2095,6 +2183,55 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
+ private void
+ scheduleTimeout(int status, int timeout)
+ {
+ if(timeout < 0)
+ {
+ return;
+ }
+
+ if((status & IceInternal.SocketOperation.Read) != 0)
+ {
+ _timer.schedule(_readTimeout, timeout);
+ _readTimeoutScheduled = true;
+ }
+ if((status & IceInternal.SocketOperation.Write) != 0)
+ {
+ _timer.schedule(_writeTimeout, timeout);
+ _writeTimeoutScheduled = true;
+ }
+ }
+
+ private void
+ unscheduleTimeout(int status)
+ {
+ if((status & IceInternal.SocketOperation.Read) != 0 && _readTimeoutScheduled)
+ {
+ _timer.cancel(_readTimeout);
+ _readTimeoutScheduled = false;
+ }
+ if((status & IceInternal.SocketOperation.Write) != 0 && _writeTimeoutScheduled)
+ {
+ _timer.cancel(_writeTimeout);
+ _writeTimeoutScheduled = false;
+ }
+ }
+
+ private int
+ connectTimeout()
+ {
+ IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideConnectTimeout)
+ {
+ return defaultsAndOverrides.overrideConnectTimeoutValue;
+ }
+ else
+ {
+ return _endpoint.timeout();
+ }
+ }
+
private void
warning(String msg, Exception ex)
{
@@ -2285,69 +2422,31 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
boolean prepared;
}
- static class SocketReadyCallback extends IceInternal.SelectorThread.SocketReadyCallback
- {
- public
- SocketReadyCallback(ConnectionI connection)
- {
- _connection = connection;
- }
-
- public java.nio.channels.SelectableChannel
- fd()
- {
- return _connection.fd();
- }
-
- public boolean
- hasMoreData()
- {
- return _connection.hasMoreData();
- }
-
- public IceInternal.SocketStatus
- socketReady()
- {
- return _connection.socketReady();
- }
-
- public void
- socketFinished()
- {
- _connection.socketFinished();
- }
-
- public void
- runTimerTask()
- {
- _connection.socketTimeout();
- }
-
- final private ConnectionI _connection;
- };
-
- private IceInternal.Transceiver _transceiver;
- private Ice.BooleanHolder _hasMoreData = new Ice.BooleanHolder(false);
-
+ private final IceInternal.Instance _instance;
+ private final IceInternal.Transceiver _transceiver;
private String _desc;
private final String _type;
private final IceInternal.EndpointI _endpoint;
- private final SocketReadyCallback _socketReadyCallback = new SocketReadyCallback(this);
private ObjectAdapter _adapter;
private IceInternal.ServantManager _servantManager;
private final Logger _logger;
private final IceInternal.TraceLevels _traceLevels;
-
private final IceInternal.ThreadPool _threadPool;
- private final IceInternal.SelectorThread _selectorThread;
+
+ private final IceInternal.Timer _timer;
+ private final IceInternal.TimerTask _writeTimeout;
+ private boolean _writeTimeoutScheduled;
+ private final IceInternal.TimerTask _readTimeout;
+ private boolean _readTimeoutScheduled;
private StartCallback _startCallback = null;
+ private Ice.BooleanHolder _hasMoreData = new Ice.BooleanHolder(false);
private final boolean _warn;
-
- private final int _acmTimeout;
+ private final boolean _warnUdp;
+ private final long _acmTimeout;
private long _acmAbsoluteTimeoutMillis;
private final int _compressionLevel;
@@ -2369,18 +2468,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private int _batchMarker;
private java.util.LinkedList<OutgoingMessage> _sendStreams = new java.util.LinkedList<OutgoingMessage>();
- private boolean _sendInProgress;
- private java.util.List<OutgoingMessage> _sentCallbacks = new java.util.LinkedList<OutgoingMessage>();
- private IceInternal.ThreadPoolWorkItem _flushSentCallbacks = new IceInternal.ThreadPoolWorkItem()
- {
- public void
- execute(IceInternal.ThreadPool threadPool)
- {
- threadPool.promoteFollower(null);
- ConnectionI.this.flushSentCallbacks();
- };
- };
+ private IceInternal.BasicStream _readStream;
+ private IceInternal.BasicStream _writeStream;
private int _dispatchCount;
@@ -2395,7 +2485,5 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private static boolean _compressionSupported = IceInternal.BasicStream.compressible();
- private boolean _overrideCompress;
- private boolean _overrideCompressValue;
private boolean _cacheBuffers;
}
diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java
index 62589a5bad7..674e690d27b 100644
--- a/java/src/Ice/ObjectAdapterI.java
+++ b/java/src/Ice/ObjectAdapterI.java
@@ -23,8 +23,6 @@ public final class ObjectAdapterI implements ObjectAdapter
public synchronized Communicator
getCommunicator()
{
- checkForDeactivation();
-
return _communicator;
}
@@ -364,7 +362,6 @@ public final class ObjectAdapterI implements ObjectAdapter
//
_instance = null;
_threadPool = null;
- _communicator = null;
_routerEndpoints = null;
_routerInfo = null;
_publishedEndpoints = null;
@@ -828,7 +825,6 @@ public final class ObjectAdapterI implements ObjectAdapter
_deactivated = true;
_destroyed = true;
_instance = null;
- _communicator = null;
_incomingConnectionFactories = null;
InitializationException ex = new InitializationException();
@@ -995,7 +991,6 @@ public final class ObjectAdapterI implements ObjectAdapter
{
IceUtilInternal.Assert.FinalizerAssert(_threadPool == null);
//IceUtilInternal.Assert.FinalizerAssert(_servantManager == null); // Not cleared, it needs to be immutable.
- IceUtilInternal.Assert.FinalizerAssert(_communicator == null);
IceUtilInternal.Assert.FinalizerAssert(_incomingConnectionFactories == null);
IceUtilInternal.Assert.FinalizerAssert(_directCount == 0);
IceUtilInternal.Assert.FinalizerAssert(!_waitForActivate);
diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java
index ecc43a8918b..77112002911 100644
--- a/java/src/IceInternal/BasicStream.java
+++ b/java/src/IceInternal/BasicStream.java
@@ -54,36 +54,37 @@ public class BasicStream
reset()
{
_buf.reset();
+ clear();
+ }
+ public void
+ clear()
+ {
if(_readEncapsStack != null)
{
assert(_readEncapsStack.next == null);
_readEncapsStack.next = _readEncapsCache;
_readEncapsCache = _readEncapsStack;
- _readEncapsStack = null;
_readEncapsCache.reset();
+ _readEncapsStack = null;
}
- if(_objectList != null)
+ if(_writeEncapsStack != null)
{
- _objectList.clear();
+ assert(_writeEncapsStack.next == null);
+ _writeEncapsStack.next = _writeEncapsCache;
+ _writeEncapsCache = _writeEncapsStack;
+ _writeEncapsCache.reset();
+ _writeEncapsStack = null;
}
- _sliceObjects = true;
- }
-
- public void
- clear()
- {
- _readEncapsStack = null;
- _writeEncapsStack = null;
_seqDataStack = null;
-
+
if(_objectList != null)
{
_objectList.clear();
}
- _objectList = null;
+
_sliceObjects = true;
}
@@ -1780,14 +1781,8 @@ public class BasicStream
}
catch(java.lang.Exception ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- IceUtilInternal.OutputBase out = new IceUtilInternal.OutputBase(pw);
- out.setUseTab(false);
- out.print("exception raised by ice_postUnmarshal:\n");
- ex.printStackTrace(pw);
- pw.flush();
- _instance.initializationData().logger.warning(sw.toString());
+ String s = "exception raised by ice_postUnmarshal:\n" + Ex.toString(ex);
+ _instance.initializationData().logger.warning("exception raised by ice_postUnmarshal:\n");
}
}
}
@@ -1809,14 +1804,8 @@ public class BasicStream
}
catch(java.lang.Exception ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- IceUtilInternal.OutputBase out = new IceUtilInternal.OutputBase(pw);
- out.setUseTab(false);
- out.print("exception raised by ice_preMarshal:\n");
- ex.printStackTrace(pw);
- pw.flush();
- _instance.initializationData().logger.warning(sw.toString());
+ String s = "exception raised by ice_preUnmarshal:\n" + Ex.toString(ex);
+ _instance.initializationData().logger.warning("exception raised by ice_preUnmarshal:\n");
}
v.__write(this);
}
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index e5eb47d7957..b7b55502eea 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -297,9 +297,9 @@ public class ConnectRequestHandler
_reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
{
public void
- execute(ThreadPool threadPool)
+ execute(ThreadPoolCurrent current)
{
- threadPool.promoteFollower(null);
+ current.ioCompleted();
flushRequestsWithException(ex);
};
});
@@ -430,13 +430,13 @@ public class ConnectRequestHandler
{
request.os.pos(0);
os.writeBlob(request.os.readBlob(request.os.size()));
- _connection.finishBatchRequest(os, _compress);
}
catch(Ice.LocalException ex)
{
_connection.abortBatchRequest();
throw ex;
}
+ _connection.finishBatchRequest(os, _compress);
}
p.remove();
}
@@ -450,9 +450,9 @@ public class ConnectRequestHandler
_reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
{
public void
- execute(ThreadPool threadPool)
+ execute(ThreadPoolCurrent current)
{
- threadPool.promoteFollower(null);
+ current.ioCompleted();
flushRequestsWithException(ex);
};
});
@@ -467,9 +467,9 @@ public class ConnectRequestHandler
_reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
{
public void
- execute(ThreadPool threadPool)
+ execute(ThreadPoolCurrent current)
{
- threadPool.promoteFollower(null);
+ current.ioCompleted();
flushRequestsWithException(ex);
};
});
@@ -482,9 +482,9 @@ public class ConnectRequestHandler
instance.clientThreadPool().execute(new ThreadPoolWorkItem()
{
public void
- execute(ThreadPool threadPool)
+ execute(ThreadPoolCurrent current)
{
- threadPool.promoteFollower(null);
+ current.ioCompleted();
for(OutgoingAsyncMessageCallback callback : sentCallbacks)
{
callback.__sent(instance);
diff --git a/java/src/IceInternal/ConnectionMonitor.java b/java/src/IceInternal/ConnectionMonitor.java
index 654a5ca46d7..927a5ce768f 100644
--- a/java/src/IceInternal/ConnectionMonitor.java
+++ b/java/src/IceInternal/ConnectionMonitor.java
@@ -86,23 +86,6 @@ public final class ConnectionMonitor implements IceInternal.TimerTask
{
conn.monitor(now);
}
- catch(Ice.LocalException ex)
- {
- synchronized(this)
- {
- if(_instance == null)
- {
- return;
- }
-
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in connection monitor:\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- }
- }
catch(java.lang.Exception ex)
{
synchronized(this)
@@ -111,11 +94,7 @@ public final class ConnectionMonitor implements IceInternal.TimerTask
{
return;
}
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "unknown exception in connection monitor:\n" + sw.toString();
+ String s = "exception in connection monitor:\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
}
}
diff --git a/java/src/IceInternal/EndpointHostResolver.java b/java/src/IceInternal/EndpointHostResolver.java
index c763fbca50d..0592b830039 100644
--- a/java/src/IceInternal/EndpointHostResolver.java
+++ b/java/src/IceInternal/EndpointHostResolver.java
@@ -25,11 +25,7 @@ public class EndpointHostResolver
}
catch(RuntimeException ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "cannot create thread for endpoint host resolver thread:\n" + sw.toString();
+ String s = "cannot create thread for endpoint host resolver thread:\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
throw ex;
}
@@ -151,38 +147,15 @@ public class EndpointHostResolver
public void
run()
{
- if(_instance.initializationData().threadHook != null)
- {
- _instance.initializationData().threadHook.start();
- }
-
try
{
EndpointHostResolver.this.run();
}
- catch(Ice.LocalException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in endpoint host resolver thread " + getName() + ":\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- }
catch(java.lang.Exception ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "unknown exception in endpoint host resolver thread " + getName() + ":\n" + sw.toString();
+ String s = "exception in endpoint host resolver thread " + getName() + ":\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
}
-
- if(_instance.initializationData().threadHook != null)
- {
- _instance.initializationData().threadHook.stop();
- }
}
}
diff --git a/java/src/IceInternal/EventHandler.java b/java/src/IceInternal/EventHandler.java
index ba7dfd69103..328db4ac1ca 100644
--- a/java/src/IceInternal/EventHandler.java
+++ b/java/src/IceInternal/EventHandler.java
@@ -9,66 +9,39 @@
package IceInternal;
-public abstract class EventHandler extends SelectorHandler
+public abstract class EventHandler
{
//
- // Return true if the handler is for a datagram transport, false otherwise.
+ // Called when there's a message ready to be processed.
//
- abstract public boolean datagram();
+ abstract public void message(ThreadPoolCurrent current);
//
- // Return true if read() must be called before calling message().
+ // Called when the event handler is unregistered.
//
- abstract public boolean readable();
+ abstract public void finished(ThreadPoolCurrent current);
//
- // Read data via the event handler. May only be called if
- // readable() returns true.
- //
- abstract public boolean read(BasicStream is);
-
- //
- // A complete message has been received.
+ // Get a textual representation of the event handler.
//
- abstract public void message(BasicStream stream, ThreadPool threadPool);
+ abstract public String toString();
//
- // Will be called if the event handler is finally
- // unregistered. (Calling unregister() does not unregister
- // immediately.)
+ // Get the native information of the handler, this is used by the selector.
//
- abstract public void finished(ThreadPool threadPool);
+ abstract public java.nio.channels.SelectableChannel fd();
//
- // Propagate an exception to the event handler.
+ // In Java, it's possible that the transceiver reads more data than what was
+ // really asked. If this is the case, hasMoreData() returns true and the handler
+ // read() method should be called again (without doing a select()). This is
+ // handled by the Selector class (it adds the handler to a separate list of
+ // handlers if this method returns true.)
//
- abstract public void exception(Ice.LocalException ex);
+ abstract public boolean hasMoreData();
- //
- // Get a textual representation of the event handler.
- //
- abstract public String toString();
-
- public IceInternal.Instance
- instance()
- {
- return _instance;
- }
-
- protected
- EventHandler(Instance instance)
- {
- _instance = instance;
- _stream = new BasicStream(instance);
- }
-
- protected Instance _instance;
-
- //
- // The _stream data member is only for use by the ThreadPool or by the
- // connection for validation.
- //
- protected BasicStream _stream;
- boolean _serializing;
- boolean _registered;
+ int _disabled = 0;
+ int _registered = 0;
+ int _ready = 0;
+ java.nio.channels.SelectionKey _key = null;
}
diff --git a/java/src/IceInternal/Ex.java b/java/src/IceInternal/Ex.java
index 07ec004ff12..01f8f9ebdad 100644
--- a/java/src/IceInternal/Ex.java
+++ b/java/src/IceInternal/Ex.java
@@ -23,4 +23,16 @@ public class Ex
throw new Ice.MemoryLimitException("requested " + requested + " bytes, maximum allowed is " + maximum +
" bytes (see Ice.MessageSizeMax)");
}
+
+ //
+ // A small utility to get the strack trace of the exception (which also includes toString()).
+ //
+ public static String toString(java.lang.Throwable ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ return sw.toString();
+ }
}
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index e83805776bf..d13039c1995 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -80,7 +80,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
// First we wait until the factory is destroyed. If we are using
// an acceptor, we also wait for it to be closed.
//
- while(_state != StateClosed || _acceptor != null)
+ while(_state != StateFinished)
{
try
{
@@ -172,170 +172,111 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
//
- // Operations from SelectorHandler.
- //
-
- public java.nio.channels.SelectableChannel
- fd()
- {
- assert(_acceptor != null);
- return _acceptor.fd();
- }
-
- public boolean
- hasMoreData()
- {
- assert(_acceptor != null);
- return false;
- }
-
- //
// Operations from EventHandler.
//
- public boolean
- datagram()
- {
- return _endpoint.datagram();
- }
-
- public boolean
- readable()
- {
- return false;
- }
-
- public boolean
- read(BasicStream unused)
- {
- assert(false); // Must not be called.
- return false;
- }
-
public void
- message(BasicStream unused, ThreadPool threadPool)
+ message(ThreadPoolCurrent current)
{
Ice.ConnectionI connection = null;
-
- try
+ synchronized(this)
{
- synchronized(this)
+ if(_state >= StateClosed)
{
- if(_state != StateActive)
+ return;
+ }
+ else if(_state == StateHolding)
+ {
+ Thread.yield();
+ return;
+ }
+
+ //
+ // Reap connections for which destruction has completed.
+ //
+ java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator();
+ while(p.hasNext())
+ {
+ Ice.ConnectionI con = p.next();
+ if(con.isFinished())
{
- Thread.yield();
- return;
+ p.remove();
}
-
- //
- // Reap connections for which destruction has completed.
- //
- java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator();
- while(p.hasNext())
+ }
+
+ //
+ // Now accept a new connection.
+ //
+ Transceiver transceiver = null;
+ try
+ {
+ transceiver = _acceptor.accept();
+ }
+ catch(Ice.SocketException ex)
+ {
+ if(Network.noMoreFds(ex.getCause()))
{
- Ice.ConnectionI con = p.next();
- if(con.isFinished())
+ try
{
- p.remove();
+ String s = "fatal error: can't accept more connections:\n" + ex.getCause().getMessage();
+ s += '\n' + _acceptor.toString();
+ _instance.initializationData().logger.error(s);
}
- }
-
- //
- // Now accept a new connection.
- //
- Transceiver transceiver = null;
- try
- {
- transceiver = _acceptor.accept();
- }
- catch(Ice.SocketException ex)
- {
- if(Network.noMoreFds(ex.getCause()))
+ finally
{
- try
- {
- String s = "fatal error: can't accept more connections:\n" + ex.getCause().getMessage();
- s += '\n' + _acceptor.toString();
- _instance.initializationData().logger.error(s);
- }
- finally
- {
- Runtime.getRuntime().halt(1);
- }
- }
-
- // Ignore socket exceptions.
- return;
- }
- catch(Ice.TimeoutException ex)
- {
- // Ignore timeouts.
- return;
+ Runtime.getRuntime().halt(1);
+ }
}
- catch(Ice.LocalException ex)
+
+ // Ignore socket exceptions.
+ return;
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Warn about other Ice local exceptions.
+ if(_warn)
{
- // Warn about other Ice local exceptions.
- if(_warn)
- {
- warning(ex);
- }
- return;
+ warning(ex);
}
+ return;
+ }
- assert(transceiver != null);
+ assert(transceiver != null);
+ try
+ {
+ connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter);
+ }
+ catch(Ice.LocalException ex)
+ {
try
{
- connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter);
+ transceiver.close();
}
- catch(Ice.LocalException ex)
+ catch(Ice.LocalException exc)
{
- try
- {
- transceiver.close();
- }
- catch(Ice.LocalException exc)
- {
- // Ignore
- }
-
- if(_warn)
- {
- warning(ex);
- }
- return;
+ // Ignore
}
- _connections.add(connection);
+ if(_warn)
+ {
+ warning(ex);
+ }
+ return;
}
- }
- finally
- {
- //
- // This makes sure that we promote a follower before we leave the scope of the mutex
- // above, but after we call accept() (if we call it).
- //
- threadPool.promoteFollower(null);
+
+ _connections.add(connection);
}
+ assert(connection != null);
connection.start(this);
}
public synchronized void
- finished(ThreadPool threadPool)
+ finished(ThreadPoolCurrent current)
{
- threadPool.promoteFollower(null);
- assert(threadPool == ((Ice.ObjectAdapterI)_adapter).getThreadPool() && _state == StateClosed);
-
- _acceptor.close();
- _acceptor = null;
- notifyAll();
- }
-
- public void
- exception(Ice.LocalException ex)
- {
- assert(false); // Must not be called.
+ assert(_state == StateClosed);
+ setState(StateFinished);
}
public synchronized String
@@ -350,6 +291,20 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
return _acceptor.toString();
}
+ public java.nio.channels.SelectableChannel
+ fd()
+ {
+ assert(_acceptor != null);
+ return _acceptor.fd();
+ }
+
+ public boolean
+ hasMoreData()
+ {
+ assert(_acceptor != null);
+ return false;
+ }
+
//
// Operations from ConnectionI.StartCallback
//
@@ -369,11 +324,11 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
public synchronized void
connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex)
{
- if(_state == StateClosed)
+ if(_state >= StateClosed)
{
return;
}
-
+
if(_warn)
{
warning(ex);
@@ -393,7 +348,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
public
IncomingConnectionFactory(Instance instance, EndpointI endpoint, Ice.ObjectAdapter adapter, String adapterName)
{
- super(instance);
+ _instance = instance;
_endpoint = endpoint;
_adapter = adapter;
_warn = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
@@ -448,6 +403,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
_endpoint = h.value;
assert(_acceptor != null);
_acceptor.listen();
+ ((Ice.ObjectAdapterI)_adapter).getThreadPool().initialize(this);
}
}
catch(java.lang.Exception ex)
@@ -492,8 +448,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
finalize()
throws Throwable
{
- IceUtilInternal.Assert.FinalizerAssert(_state == StateClosed);
- IceUtilInternal.Assert.FinalizerAssert(_acceptor == null);
+ IceUtilInternal.Assert.FinalizerAssert(_state == StateFinished);
IceUtilInternal.Assert.FinalizerAssert(_connections == null);
super.finalize();
@@ -502,6 +457,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
private static final int StateActive = 0;
private static final int StateHolding = 1;
private static final int StateClosed = 2;
+ private static final int StateFinished = 3;
private void
setState(int state)
@@ -521,7 +477,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
if(_acceptor != null)
{
- ((Ice.ObjectAdapterI)_adapter).getThreadPool()._register(this);
+ ((Ice.ObjectAdapterI)_adapter).getThreadPool().register(this, SocketOperation.Read);
}
java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator();
@@ -541,7 +497,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
if(_acceptor != null)
{
- ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(this);
+ ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(this, SocketOperation.Read);
}
java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator();
@@ -559,6 +515,10 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
{
((Ice.ObjectAdapterI)_adapter).getThreadPool().finish(this);
}
+ else
+ {
+ state = StateFinished;
+ }
java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator();
while(p.hasNext())
@@ -568,6 +528,16 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
}
break;
}
+
+ case StateFinished:
+ {
+ assert(_state == StateClosed);
+ if(_acceptor != null)
+ {
+ _acceptor.close();
+ }
+ break;
+ }
}
_state = state;
@@ -577,14 +547,12 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
private void
warning(Ice.LocalException ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "connection exception:\n" + sw.toString() + '\n' + _acceptor.toString();
+ String s = "connection exception:\n" + Ex.toString(ex) + '\n' + _acceptor.toString();
_instance.initializationData().logger.warning(s);
}
+ private final IceInternal.Instance _instance;
+
private Acceptor _acceptor;
private final Transceiver _transceiver;
private EndpointI _endpoint;
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index ab73e298d64..54006b95dd2 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -175,18 +175,6 @@ public final class Instance
return _serverThreadPool;
}
- public synchronized SelectorThread
- selectorThread()
- {
- if(_state == StateDestroyed)
- {
- throw new Ice.CommunicatorDestroyedException();
- }
-
- assert(_selectorThread != null);
- return _selectorThread;
- }
-
public synchronized EndpointHostResolver
endpointHostResolver()
{
@@ -716,11 +704,7 @@ public final class Instance
}
catch(RuntimeException ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "cannot create thread for endpoint host resolver:\n" + sw.toString();
+ String s = "cannot create thread for endpoint host resolver:\n" + Ex.toString(ex);
_initData.logger.error(s);
throw ex;
}
@@ -735,19 +719,13 @@ public final class Instance
}
catch(RuntimeException ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "cannot create thread for timer:\n" + sw.toString();
+ String s = "cannot create thread for timer:\n" + Ex.toString(ex);
_initData.logger.error(s);
throw ex;
}
_clientThreadPool = new ThreadPool(this, "Ice.ThreadPool.Client", 0);
- _selectorThread = new SelectorThread(this);
-
//
// Add Process and Properties facets
//
@@ -781,7 +759,6 @@ public final class Instance
IceUtilInternal.Assert.FinalizerAssert(_objectAdapterFactory == null);
IceUtilInternal.Assert.FinalizerAssert(_clientThreadPool == null);
IceUtilInternal.Assert.FinalizerAssert(_serverThreadPool == null);
- IceUtilInternal.Assert.FinalizerAssert(_selectorThread == null);
IceUtilInternal.Assert.FinalizerAssert(_endpointHostResolver == null);
IceUtilInternal.Assert.FinalizerAssert(_timer == null);
IceUtilInternal.Assert.FinalizerAssert(_routerManager == null);
@@ -922,7 +899,6 @@ public final class Instance
ThreadPool serverThreadPool = null;
ThreadPool clientThreadPool = null;
- SelectorThread selectorThread = null;
EndpointHostResolver endpointHostResolver = null;
synchronized(this)
@@ -951,13 +927,6 @@ public final class Instance
_clientThreadPool = null;
}
- if(_selectorThread != null)
- {
- _selectorThread.destroy();
- selectorThread = _selectorThread;
- _selectorThread = null;
- }
-
if(_endpointHostResolver != null)
{
_endpointHostResolver.destroy();
@@ -1027,10 +996,6 @@ public final class Instance
{
serverThreadPool.joinWithAllThreads();
}
- if(selectorThread != null)
- {
- selectorThread.joinWithThread();
- }
if(endpointHostResolver != null)
{
endpointHostResolver.joinWithThread();
@@ -1105,7 +1070,6 @@ public final class Instance
private int _protocolSupport;
private ThreadPool _clientThreadPool;
private ThreadPool _serverThreadPool;
- private SelectorThread _selectorThread;
private EndpointHostResolver _endpointHostResolver;
private RetryQueue _retryQueue;
private Timer _timer;
diff --git a/java/src/IceInternal/LocalExceptionWrapper.java b/java/src/IceInternal/LocalExceptionWrapper.java
index 11529b52a6f..2a135f7eea7 100644
--- a/java/src/IceInternal/LocalExceptionWrapper.java
+++ b/java/src/IceInternal/LocalExceptionWrapper.java
@@ -62,11 +62,7 @@ public class LocalExceptionWrapper extends Exception
}
throw new LocalExceptionWrapper(new Ice.UnknownLocalException(((Ice.LocalException)ex).ice_name()), false);
}
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- throw new LocalExceptionWrapper(new Ice.UnknownException(sw.toString()), false);
+ throw new LocalExceptionWrapper(new Ice.UnknownException(Ex.toString(ex)), false);
}
private Ice.LocalException _ex;
diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
index d47147f6e35..3f875de3336 100644
--- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java
+++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
@@ -105,9 +105,9 @@ abstract public class OutgoingAsyncMessageCallback
__os.instance().clientThreadPool().execute(new ThreadPoolWorkItem()
{
public void
- execute(ThreadPool threadPool)
+ execute(ThreadPoolCurrent current)
{
- threadPool.promoteFollower(null);
+ current.ioCompleted();
__exception(ex);
}
});
@@ -149,14 +149,8 @@ abstract public class OutgoingAsyncMessageCallback
{
if(instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- IceUtilInternal.OutputBase out = new IceUtilInternal.OutputBase(pw);
- out.setUseTab(false);
- out.print("exception raised by AMI callback:\n");
- ex.printStackTrace(pw);
- pw.flush();
- instance.initializationData().logger.warning(sw.toString());
+ String s = "exception raised by AMI callback:\n" + Ex.toString(ex);
+ instance.initializationData().logger.warning(s);
}
}
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 6ce51eac569..02eabeb4e81 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -669,6 +669,7 @@ public final class OutgoingConnectionFactory
// is necessary to support the interruption of the connection initialization and validation
// in case the communicator is destroyed.
//
+ Ice.ConnectionI connection = null;
try
{
if(_destroyed)
@@ -676,23 +677,7 @@ public final class OutgoingConnectionFactory
throw new Ice.CommunicatorDestroyedException();
}
- Ice.ConnectionI connection = new Ice.ConnectionI(_instance, transceiver, ci.endpoint.compress(false),null);
-
- java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci);
- if(connectionList == null)
- {
- connectionList = new java.util.LinkedList<Ice.ConnectionI>();
- _connections.put(ci, connectionList);
- }
- connectionList.add(connection);
- connectionList = _connectionsByEndpoint.get(ci.endpoint);
- if(connectionList == null)
- {
- connectionList = new java.util.LinkedList<Ice.ConnectionI>();
- _connectionsByEndpoint.put(ci.endpoint, connectionList);
- }
- connectionList.add(connection);
- return connection;
+ connection = new Ice.ConnectionI(_instance, transceiver, ci.endpoint.compress(false),null);
}
catch(Ice.LocalException ex)
{
@@ -706,6 +691,22 @@ public final class OutgoingConnectionFactory
}
throw ex;
}
+
+ java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci);
+ if(connectionList == null)
+ {
+ connectionList = new java.util.LinkedList<Ice.ConnectionI>();
+ _connections.put(ci, connectionList);
+ }
+ connectionList.add(connection);
+ connectionList = _connectionsByEndpoint.get(ci.endpoint);
+ if(connectionList == null)
+ {
+ connectionList = new java.util.LinkedList<Ice.ConnectionI>();
+ _connectionsByEndpoint.put(ci.endpoint, connectionList);
+ }
+ connectionList.add(connection);
+ return connection;
}
private void
diff --git a/java/src/IceInternal/PropertyNames.java b/java/src/IceInternal/PropertyNames.java
index 15e20c6b9a5..62ecfb4d287 100644
--- a/java/src/IceInternal/PropertyNames.java
+++ b/java/src/IceInternal/PropertyNames.java
@@ -8,7 +8,7 @@
// **********************************************************************
//
-// Generated by makeprops.py from file ../config/PropertyNames.xml, Wed Jul 29 10:07:20 2009
+// Generated by makeprops.py from file ./config/PropertyNames.xml, Tue Aug 11 09:12:32 2009
// IMPORTANT: Do not edit this file -- any edits made here will be lost!
@@ -33,6 +33,7 @@ public final class PropertyNames
new Property("Ice\\.Admin\\.ThreadPool\\.SizeWarn", false, null),
new Property("Ice\\.Admin\\.ThreadPool\\.StackSize", false, null),
new Property("Ice\\.Admin\\.ThreadPool\\.Serialize", false, null),
+ new Property("Ice\\.Admin\\.ThreadPool\\.ThreadIdleTime", false, null),
new Property("Ice\\.Admin\\.ThreadPool\\.ThreadPriority", false, null),
new Property("Ice\\.Admin\\.DelayCreation", false, null),
new Property("Ice\\.Admin\\.Facets", false, null),
@@ -100,12 +101,14 @@ public final class PropertyNames
new Property("Ice\\.ThreadPool\\.Client\\.SizeWarn", false, null),
new Property("Ice\\.ThreadPool\\.Client\\.StackSize", false, null),
new Property("Ice\\.ThreadPool\\.Client\\.Serialize", false, null),
+ new Property("Ice\\.ThreadPool\\.Client\\.ThreadIdleTime", false, null),
new Property("Ice\\.ThreadPool\\.Client\\.ThreadPriority", false, null),
new Property("Ice\\.ThreadPool\\.Server\\.Size", false, null),
new Property("Ice\\.ThreadPool\\.Server\\.SizeMax", false, null),
new Property("Ice\\.ThreadPool\\.Server\\.SizeWarn", false, null),
new Property("Ice\\.ThreadPool\\.Server\\.StackSize", false, null),
new Property("Ice\\.ThreadPool\\.Server\\.Serialize", false, null),
+ new Property("Ice\\.ThreadPool\\.Server\\.ThreadIdleTime", false, null),
new Property("Ice\\.ThreadPool\\.Server\\.ThreadPriority", false, null),
new Property("Ice\\.ThreadPriority", false, null),
new Property("Ice\\.Trace\\.GC", false, null),
@@ -153,6 +156,7 @@ public final class PropertyNames
new Property("IceBox\\.ServiceManager\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceBox\\.ServiceManager\\.ThreadPool\\.StackSize", false, null),
new Property("IceBox\\.ServiceManager\\.ThreadPool\\.Serialize", false, null),
+ new Property("IceBox\\.ServiceManager\\.ThreadPool\\.ThreadIdleTime", false, null),
new Property("IceBox\\.ServiceManager\\.ThreadPool\\.ThreadPriority", false, null),
new Property("IceBox\\.Trace\\.ServiceObserver", false, null),
new Property("IceBox\\.UseSharedCommunicator\\.[^\\s]+", false, null),
@@ -200,6 +204,7 @@ public final class PropertyNames
new Property("IceGrid\\.Node\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Node\\.ThreadPool\\.StackSize", false, null),
new Property("IceGrid\\.Node\\.ThreadPool\\.Serialize", false, null),
+ new Property("IceGrid\\.Node\\.ThreadPool\\.ThreadIdleTime", false, null),
new Property("IceGrid\\.Node\\.ThreadPool\\.ThreadPriority", false, null),
new Property("IceGrid\\.Node\\.AllowRunningServersAsRoot", false, null),
new Property("IceGrid\\.Node\\.AllowEndpointsOverride", false, null),
@@ -251,6 +256,7 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.StackSize", false, null),
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.Serialize", false, null),
+ new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.ThreadIdleTime", false, null),
new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.ThreadPriority", false, null),
new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.EndpointSelection", false, null),
new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.ConnectionCached", false, null),
@@ -274,6 +280,7 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.StackSize", false, null),
new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.Serialize", false, null),
+ new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.ThreadIdleTime", false, null),
new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.ThreadPriority", false, null),
new Property("IceGrid\\.Registry\\.CryptPasswords", false, null),
new Property("IceGrid\\.Registry\\.Data", false, null),
@@ -292,6 +299,7 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.StackSize", false, null),
new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.Serialize", false, null),
+ new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.ThreadIdleTime", false, null),
new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.ThreadPriority", false, null),
new Property("IceGrid\\.Registry\\.NodeSessionTimeout", false, null),
new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.EndpointSelection", false, null),
@@ -318,6 +326,7 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.StackSize", false, null),
new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.Serialize", false, null),
+ new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.ThreadIdleTime", false, null),
new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.ThreadPriority", false, null),
new Property("IceGrid\\.Registry\\.SessionFilters", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.AdapterId", false, null),
@@ -333,6 +342,7 @@ public final class PropertyNames
new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.SizeWarn", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.StackSize", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.Serialize", false, null),
+ new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.ThreadIdleTime", false, null),
new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.ThreadPriority", false, null),
new Property("IceGrid\\.Registry\\.SessionTimeout", false, null),
new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.EndpointSelection", false, null),
@@ -375,6 +385,7 @@ public final class PropertyNames
new Property("IcePatch2\\.ThreadPool\\.SizeWarn", false, null),
new Property("IcePatch2\\.ThreadPool\\.StackSize", false, null),
new Property("IcePatch2\\.ThreadPool\\.Serialize", false, null),
+ new Property("IcePatch2\\.ThreadPool\\.ThreadIdleTime", false, null),
new Property("IcePatch2\\.ThreadPool\\.ThreadPriority", false, null),
new Property("IcePatch2\\.Admin\\.AdapterId", true, null),
new Property("IcePatch2\\.Admin\\.Endpoints", true, null),
@@ -465,6 +476,7 @@ public final class PropertyNames
new Property("Glacier2\\.Client\\.ThreadPool\\.SizeWarn", false, null),
new Property("Glacier2\\.Client\\.ThreadPool\\.StackSize", false, null),
new Property("Glacier2\\.Client\\.ThreadPool\\.Serialize", false, null),
+ new Property("Glacier2\\.Client\\.ThreadPool\\.ThreadIdleTime", false, null),
new Property("Glacier2\\.Client\\.ThreadPool\\.ThreadPriority", false, null),
new Property("Glacier2\\.Client\\.AlwaysBatch", false, null),
new Property("Glacier2\\.Client\\.Buffered", false, null),
@@ -515,6 +527,7 @@ public final class PropertyNames
new Property("Glacier2\\.Server\\.ThreadPool\\.SizeWarn", false, null),
new Property("Glacier2\\.Server\\.ThreadPool\\.StackSize", false, null),
new Property("Glacier2\\.Server\\.ThreadPool\\.Serialize", false, null),
+ new Property("Glacier2\\.Server\\.ThreadPool\\.ThreadIdleTime", false, null),
new Property("Glacier2\\.Server\\.ThreadPool\\.ThreadPriority", false, null),
new Property("Glacier2\\.Server\\.AlwaysBatch", false, null),
new Property("Glacier2\\.Server\\.Buffered", false, null),
diff --git a/java/src/IceInternal/Selector.java b/java/src/IceInternal/Selector.java
index de45ba303fa..9427db70330 100644
--- a/java/src/IceInternal/Selector.java
+++ b/java/src/IceInternal/Selector.java
@@ -11,21 +11,18 @@ package IceInternal;
public final class Selector
{
+ static final class TimeoutException extends Exception
+ {
+ }
+
public
- Selector(Instance instance, int timeout)
+ Selector(Instance instance)
{
_instance = instance;
- _timeout = timeout;
- _interruptCount = 0;
- Network.SocketPair pair = Network.createPipe();
- _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
- _fdIntrWrite = pair.sink;
try
{
_selector = java.nio.channels.Selector.open();
- pair.source.configureBlocking(false);
- _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ);
}
catch(java.io.IOException ex)
{
@@ -52,87 +49,203 @@ public final class Selector
{
}
_selector = null;
+ }
- try
+ public void
+ initialize(EventHandler handler)
+ {
+ updateImpl(handler);
+ }
+
+ public void
+ update(EventHandler handler, int remove, int add)
+ {
+ int previous = handler._registered;
+ handler._registered = handler._registered & ~remove;
+ handler._registered = handler._registered | add;
+ if(previous == handler._registered)
{
- _fdIntrWrite.close();
+ return;
}
- catch(java.io.IOException ex)
+ updateImpl(handler);
+
+ if(handler.hasMoreData() && (handler._disabled & SocketOperation.Read) == 0)
{
+ if((add & SocketOperation.Read) != 0)
+ {
+ _pendingHandlers.add(handler);
+ }
+ if((remove & SocketOperation.Read) != 0)
+ {
+ _pendingHandlers.remove(handler);
+ }
}
- _fdIntrWrite = null;
+ }
- try
+ public void
+ enable(EventHandler handler, int status)
+ {
+ if((handler._disabled & status) == 0)
{
- _fdIntrRead.close();
+ return;
}
- catch(java.io.IOException ex)
+ handler._disabled = handler._disabled & ~status;
+
+ if((handler._registered & status) != 0)
{
+ updateImpl(handler);
+
+ if((status & SocketOperation.Read) != 0 && handler.hasMoreData())
+ {
+ // Add back the pending handler if reads are enabled.
+ _pendingHandlers.add(handler);
+ }
}
- _fdIntrRead = null;
}
public void
- add(SelectorHandler handler, SocketStatus status)
+ disable(EventHandler handler, int status)
{
- // Note: we can't support noInterrupt for add() because a channel can't be registered again
- // with the selector until the previous selection key has been removed from the cancel-key
- // set of the selector on the next select() operation.
-
- handler._pendingStatus = status;
- if(_changes.add(handler))
- {
- setInterrupt();
+ if((handler._disabled & status) != 0)
+ {
+ return;
+ }
+ handler._disabled = handler._disabled | status;
+
+ if((handler._registered & status) != 0)
+ {
+ updateImpl(handler);
+
+ if((status & SocketOperation.Read) != 0 && handler.hasMoreData())
+ {
+ // Remove the pending handler if reads are disabled.
+ _pendingHandlers.remove(handler);
+ }
}
}
public void
- update(SelectorHandler handler, SocketStatus newStatus)
+ finish(EventHandler handler)
{
- // Note: can only be called from the select() thread
- assert(handler._key != null);
- handler._key.interestOps(convertStatus(handler.fd(), newStatus));
+ handler._registered = 0;
+
+ if(handler._key != null)
+ {
+ handler._key.cancel();
+ handler._key = null;
+ }
+
+ _changes.remove(handler);
+ _pendingHandlers.remove(handler);
}
public void
- remove(SelectorHandler handler)
+ startSelect()
{
- // Note: we can't support noInterrupt for remove() because a channel can't be registered again
- // with the selector until the previous selection key has been removed from the cancel-key
- // set of the selector on the next select() operation.
+ assert(_changes.isEmpty());
- handler._pendingStatus = IceInternal.SocketStatus.Finished;
- if(_changes.add(handler))
+ //
+ // Don't set _selecting = true if there are pending handlers, select() won't block
+ // and will just call selectNow().
+ //
+ if(_pendingHandlers.isEmpty())
{
- setInterrupt();
+ _selecting = true;
}
}
public void
- select()
- throws java.io.IOException
+ finishSelect(java.util.List<EventHandler> handlers, long timeout)
{
- //
- // If there are still interrupts, selected keys or pending handlers to process,
- // return immediately.
- //
- if(_interrupted || !_keys.isEmpty() || !_pendingHandlers.isEmpty())
+ _selecting = false;
+ handlers.clear();
+
+ if(!_changes.isEmpty())
{
+ for(EventHandler h : _changes)
+ {
+ updateImpl(h);
+ }
+ _changes.clear();
+ }
+ else if(_keys.isEmpty() && _pendingHandlers.isEmpty() && timeout <= 0)
+ {
+ //
+ // This is necessary to prevent a busy loop in case of a spurious wake-up which
+ // sometime occurs in the client thread pool when the communicator is destroyed.
+ // If there are too many successive spurious wake-ups, we log an error.
+ //
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
+
+ if(++_spuriousWakeUp > 100)
+ {
+ _instance.initializationData().logger.error("spurious selector wake up");
+ }
return;
}
+
+ _spuriousWakeUp = 0;
- //
- // There's nothing left to process, we can now select.
- //
+ for(java.nio.channels.SelectionKey key : _keys)
+ {
+ EventHandler handler = (EventHandler)key.attachment();
+ try
+ {
+ handler._ready = fromJavaOps(key.readyOps());
+ if(handler.hasMoreData() && _pendingHandlers.remove(handler))
+ {
+ handler._ready |= SocketOperation.Read;
+ }
+ handlers.add(handler);
+ }
+ catch(java.nio.channels.CancelledKeyException ex)
+ {
+ assert(handler._registered == 0);
+ }
+ }
+ _keys.clear();
+
+ for(EventHandler handler : _pendingHandlers)
+ {
+ if(handler.hasMoreData())
+ {
+ handler._ready = SocketOperation.Read;
+ handlers.add(handler);
+ }
+ }
+ _pendingHandlers.clear();
+ }
+
+ public void
+ select(long timeout)
+ throws TimeoutException
+ {
while(true)
{
try
{
- if(_nextPendingHandlers.isEmpty())
+ //
+ // Only block if _selecting = true, otherwise we call selectNow() to retrieve new
+ // ready handlers and process handlers from _pendingHandlers.
+ //
+ if(_selecting)
{
- if(_timeout > 0)
+ if(timeout > 0)
{
- _selector.select(_timeout * 1000);
+ long before = IceInternal.Time.currentMonotonicTimeMillis();
+ if(_selector.select(timeout * 1000) == 0)
+ {
+ if(IceInternal.Time.currentMonotonicTimeMillis() - before >= timeout * 1000)
+ {
+ throw new TimeoutException();
+ }
+ }
}
else
{
@@ -142,10 +255,6 @@ public final class Selector
else
{
_selector.selectNow();
-
- java.util.HashSet<SelectorHandler> tmp = _nextPendingHandlers;
- _nextPendingHandlers = _pendingHandlers;
- _pendingHandlers = tmp;
}
}
catch(java.nio.channels.CancelledKeyException ex)
@@ -166,326 +275,127 @@ public final class Selector
continue;
}
- throw ex;
+ try
+ {
+ String s = "fatal error: selector failed:\n" + ex.getCause().getMessage();
+ _instance.initializationData().logger.error(s);
+ }
+ finally
+ {
+ Runtime.getRuntime().halt(1);
+ }
}
break;
}
}
- public SelectorHandler
- getNextSelected()
+ public void
+ hasMoreData(EventHandler handler)
{
- assert(_interruptCount == 0);
+ assert(!_selecting && handler.hasMoreData());
- if(_iter == null && !_keys.isEmpty())
- {
- _iter = _keys.iterator();
- }
-
- while(_iter != null && _iter.hasNext())
- {
- java.nio.channels.SelectionKey key = _iter.next();
- _iter.remove();
- SelectorHandler handler = (SelectorHandler)key.attachment();
- if(handler == null)
- {
- assert(_pendingInterruptRead > 0);
- _pendingInterruptRead -= readInterrupt(_pendingInterruptRead);
- continue;
- }
- else if(handler._key == null || !handler._key.isValid())
- {
- continue;
- }
- if(handler.hasMoreData())
- {
- assert(_pendingIter == null);
- _pendingHandlers.remove(handler);
- }
- if(!_iter.hasNext())
- {
- _iter = null;
- }
- return handler;
- }
-
- if(_pendingIter == null && !_pendingHandlers.isEmpty())
- {
- _pendingIter = _pendingHandlers.iterator();
- }
-
- while(_pendingIter != null && _pendingIter.hasNext())
+ //
+ // Only add the handler if read is still registered and enabled.
+ //
+ if((handler._registered & ~handler._disabled & SocketOperation.Read) != 0)
{
- SelectorHandler handler = _pendingIter.next();
- _pendingIter.remove();
- if(handler._key == null || !handler._key.isValid() || !handler.hasMoreData())
- {
- continue;
- }
- if(!_pendingIter.hasNext())
- {
- _pendingIter = null;
- }
- return handler;
+ _pendingHandlers.add(handler);
}
-
- _iter = null;
- _pendingIter = null;
- return null;
}
- public void
- hasMoreData(SelectorHandler handler)
- {
- _nextPendingHandlers.add(handler);
- }
-
- public boolean
- processInterrupt()
+ private void
+ updateImpl(EventHandler handler)
{
- assert(_changes.size() <= _interruptCount);
-
- if(!_changes.isEmpty())
+ if(_selecting)
{
- for(SelectorHandler handler : _changes)
- {
- if(handler._pendingStatus == SocketStatus.Finished)
- {
- removeImpl(handler);
- }
- else
- {
- addImpl(handler, handler._pendingStatus);
- }
- clearInterrupt();
- }
- _changes.clear();
-
//
- // We call selectNow() to flush the cancelled-key set and ensure handlers can be
- // added again once this returns.
+ // Queue the change since we can't change the selection key interest ops while a select
+ // operation is in progress (it could block depending on the underlying implementaiton
+ // of the Java selector).
//
- try
+ if(_changes.isEmpty())
{
- _selector.selectNow();
+ _selector.wakeup();
}
- catch(java.io.IOException ex)
- {
- // Ignore.
- }
-
- //
- // Current iterator is invalidated by selectNow().
- //
- _iter = null;
- _pendingIter = null;
+ _changes.add(handler);
+ return;
}
-
- _interrupted = _interruptCount > 0;
- return _interruptCount == 0; // No more interrupts to process.
- }
- public boolean
- checkTimeout()
- {
- if(_interruptCount == 0 && _keys.isEmpty() && _pendingHandlers.isEmpty())
+ int ops = toJavaOps(handler, handler._registered & ~handler._disabled);
+ if(handler._key == null)
{
- if(_timeout <= 0)
+ if(handler._registered != 0)
{
- //
- // This is necessary to prevent a busy loop in case of a spurious wake-up which
- // sometime occurs in the client thread pool when the communicator is destroyed.
- // If there are too many successive spurious wake-ups, we log an error.
- //
try
{
- Thread.sleep(1);
+ handler._key = handler.fd().register(_selector, ops, handler);
}
- catch(java.lang.InterruptedException ex)
+ catch(java.nio.channels.ClosedChannelException ex)
{
+ assert(false);
}
-
- if(++_spuriousWakeUp > 100)
- {
- _instance.initializationData().logger.error("spurious selector wake up");
- }
- return false;
}
- return true;
}
else
{
- _spuriousWakeUp = 0;
- return false;
- }
- }
-
- public boolean
- isInterrupted()
- {
- return _interruptCount > 0;
- }
-
- public void
- setInterrupt()
- {
- if(++_interruptCount == 1)
- {
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
- buf.put(0, (byte)0);
- while(buf.hasRemaining())
- {
- try
- {
- _fdIntrWrite.write(buf);
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
+ handler._key.interestOps(ops);
}
}
- public boolean
- clearInterrupt()
+ int
+ toJavaOps(EventHandler handler, int o)
{
- if(--_interruptCount == 0)
+ int op = 0;
+ if((o & SocketOperation.Read) != 0)
{
- //
- // If the interrupt byte has not been received by the pipe yet, we just increment
- // _pendingInterruptRead. It will be read when the _fdIntrRead is ready for read.
- //
- if(_keys.contains(_fdIntrReadKey))
+ if((handler.fd().validOps() & java.nio.channels.SelectionKey.OP_READ) != 0)
{
- readInterrupt(1);
- _keys.remove(_fdIntrReadKey);
- _iter = null;
- _pendingIter = null;
+ op |= java.nio.channels.SelectionKey.OP_READ;
}
else
{
- ++_pendingInterruptRead;
+ op |= java.nio.channels.SelectionKey.OP_ACCEPT;
}
- _interrupted = false;
- return false;
- }
- else
- {
- return true;
}
- }
-
- private int
- readInterrupt(int count)
- {
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(count);
- try
+ if((o & SocketOperation.Write) != 0)
{
- buf.rewind();
- int ret = _fdIntrRead.read(buf);
- assert(ret > 0);
- return ret;
+ op |= java.nio.channels.SelectionKey.OP_WRITE;
}
- catch(java.io.IOException ex)
+ if((o & SocketOperation.Connect) != 0)
{
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- throw se;
- }
- }
-
- private int
- convertStatus(java.nio.channels.SelectableChannel fd, SocketStatus status)
- {
- if(status == SocketStatus.NeedConnect)
- {
- return java.nio.channels.SelectionKey.OP_CONNECT;
- }
- else if(status == SocketStatus.NeedRead)
- {
- if((fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
- {
- return java.nio.channels.SelectionKey.OP_READ;
- }
- else
- {
- return java.nio.channels.SelectionKey.OP_ACCEPT;
- }
- }
- else
- {
- assert(status == SocketStatus.NeedWrite);
- return java.nio.channels.SelectionKey.OP_WRITE;
+ op |= java.nio.channels.SelectionKey.OP_CONNECT;
}
+ return op;
}
- private void
- addImpl(SelectorHandler handler, SocketStatus status)
+ int
+ fromJavaOps(int o)
{
- if(handler._key != null)
+ int op = 0;
+ if((o & (java.nio.channels.SelectionKey.OP_READ | java.nio.channels.SelectionKey.OP_ACCEPT)) != 0)
{
- handler._key.interestOps(convertStatus(handler.fd(), status));
+ op |= SocketOperation.Read;
}
- else
+ if((o & java.nio.channels.SelectionKey.OP_WRITE) != 0)
{
- try
- {
- handler._key = handler.fd().register(_selector, convertStatus(handler.fd(), status), handler);
- }
- catch(java.nio.channels.ClosedChannelException ex)
- {
- assert(false);
- }
- assert(!_nextPendingHandlers.contains(handler));
+ op |= SocketOperation.Write;
}
-
- if(handler.hasMoreData())
+ if((o & java.nio.channels.SelectionKey.OP_CONNECT) != 0)
{
- _nextPendingHandlers.add(handler);
+ op |= SocketOperation.Connect;
}
+ return op;
}
- private void
- removeImpl(SelectorHandler handler)
- {
- _nextPendingHandlers.remove(handler);
-
- if(handler._key != null)
- {
- try
- {
- handler._key.cancel();
- handler._key = null;
- }
- catch(java.nio.channels.CancelledKeyException ex)
- {
- assert(false);
- }
- }
- }
final private Instance _instance;
- final private int _timeout;
private java.nio.channels.Selector _selector;
- private java.nio.channels.ReadableByteChannel _fdIntrRead;
- private java.nio.channels.WritableByteChannel _fdIntrWrite;
- private java.nio.channels.SelectionKey _fdIntrReadKey;
private java.util.Set<java.nio.channels.SelectionKey> _keys;
- private java.util.Iterator<java.nio.channels.SelectionKey> _iter;
- private java.util.HashSet<SelectorHandler> _changes = new java.util.HashSet<SelectorHandler>();
-
- private boolean _interrupted;
+ private java.util.HashSet<EventHandler> _changes = new java.util.HashSet<EventHandler>();
+ private java.util.HashSet<EventHandler> _pendingHandlers = new java.util.HashSet<EventHandler>();
+ private boolean _selecting;
private int _spuriousWakeUp;
- private int _interruptCount;
- private int _pendingInterruptRead;
-
- private java.util.HashSet<SelectorHandler> _pendingHandlers = new java.util.HashSet<SelectorHandler>();
- private java.util.HashSet<SelectorHandler> _nextPendingHandlers = new java.util.HashSet<SelectorHandler>();
- private java.util.Iterator<SelectorHandler> _pendingIter;
-};
+}
diff --git a/java/src/IceInternal/SelectorHandler.java b/java/src/IceInternal/SelectorHandler.java
deleted file mode 100644
index 39ec56f7929..00000000000
--- a/java/src/IceInternal/SelectorHandler.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-abstract class SelectorHandler
-{
- abstract public java.nio.channels.SelectableChannel fd();
-
- //
- // In Java, it's possible that the transceiver reads more data
- // than what was really asked. If this is the case, hasMoreData()
- // returns true and the handler read() method should be called
- // again (without doing a select()). This is handled by the
- // Selector class (it adds the handler to a separate list of handlers
- // if this method returns true.)
- //
- abstract public boolean hasMoreData();
-
- //
- // The _key data member are only for use by the Selector.
- //
- protected java.nio.channels.SelectionKey _key;
- protected SocketStatus _pendingStatus;
-};
diff --git a/java/src/IceInternal/SelectorThread.java b/java/src/IceInternal/SelectorThread.java
deleted file mode 100644
index 8dfab6449c7..00000000000
--- a/java/src/IceInternal/SelectorThread.java
+++ /dev/null
@@ -1,307 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-public class SelectorThread
-{
- static public abstract class SocketReadyCallback extends SelectorHandler implements TimerTask
- {
- abstract public SocketStatus socketReady();
- abstract public void socketFinished();
-
- //
- // The selector thread doesn't unregister the callback when sockectTimeout is called; socketTimeout
- // must unregister the callback either explicitly with unregister() or by shutting down the socket
- // (if necessary).
- //
- //abstract void socketTimeout();
-
- protected int _timeout;
- protected SocketStatus _status;
- protected SocketStatus _previousStatus;
- }
-
- SelectorThread(Instance instance)
- {
- _instance = instance;
- _destroyed = false;
- _selector = new Selector(instance, 0);
-
- try
- {
- _thread = new HelperThread();
- if(_instance.initializationData().properties.getProperty("Ice.ThreadPriority") != "")
- {
- _thread.setPriority(Util.getThreadPriorityProperty(_instance.initializationData().properties, "Ice"));
- }
- _thread.start();
- }
- catch(RuntimeException ex)
- {
- _selector.destroy();
- _selector = null;
-
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "cannot create thread for selector thread:\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- throw ex;
- }
-
- _timer = _instance.timer();
- }
-
- protected synchronized void
- finalize()
- throws Throwable
- {
- IceUtilInternal.Assert.FinalizerAssert(_destroyed);
- }
-
- public synchronized void
- destroy()
- {
- assert(!_destroyed);
- _destroyed = true;
- _selector.setInterrupt();
- }
-
- public synchronized void
- _register(SocketReadyCallback cb, SocketStatus status, int timeout)
- {
- assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories.
- assert(status != SocketStatus.Finished);
-
- cb._timeout = timeout;
- cb._status = status;
- if(cb._timeout >= 0)
- {
- _timer.schedule(cb, cb._timeout);
- }
-
- _selector.add(cb, status);
- }
-
- public synchronized void
- unregister(SocketReadyCallback cb)
- {
- // Note: unregister should only be called from the socketReady() call-back.
- assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories.
- assert(cb._status != SocketStatus.Finished);
-
- _selector.remove(cb);
- cb._status = SocketStatus.Finished;
- }
-
- public synchronized void
- finish(SocketReadyCallback cb)
- {
- assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories.
- assert(cb._status != SocketStatus.Finished);
-
- _selector.remove(cb);
- cb._status = SocketStatus.Finished;
-
- _finished.add(cb);
- _selector.setInterrupt();
- }
-
- public void
- joinWithThread()
- {
- if(_thread != null)
- {
- try
- {
- _thread.join();
- }
- catch(InterruptedException ex)
- {
- }
- }
- }
-
- public void
- run()
- {
- while(true)
- {
- try
- {
- _selector.select();
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- //throw se;
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- se.printStackTrace(pw);
- pw.flush();
- String s = "exception in selector thread:\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- continue;
- }
-
- java.util.LinkedList<SocketReadyCallback> readyList = new java.util.LinkedList<SocketReadyCallback>();
- boolean finished = false;
-
- synchronized(this)
- {
- _selector.checkTimeout();
-
- if(_selector.isInterrupted())
- {
- if(_selector.processInterrupt())
- {
- continue;
- }
-
- //
- // There are two possiblities for an interrupt:
- //
- // 1. The selector thread has been destroyed.
- // 2. A callback is being finished
- //
-
- //
- // Thread destroyed?
- //
- if(_destroyed)
- {
- break;
- }
-
- do
- {
- SocketReadyCallback cb = _finished.removeFirst();
- cb._previousStatus = SocketStatus.Finished;
- readyList.add(cb);
- }
- while(_selector.clearInterrupt()); // As long as there are interrupts
- finished = true;
- }
- else
- {
- SocketReadyCallback cb;
- while((cb = (SocketReadyCallback)_selector.getNextSelected()) != null)
- {
- cb._previousStatus = cb._status;
- readyList.add(cb);
- }
- }
- }
-
- for(SocketReadyCallback cb : readyList)
- {
- SocketStatus status = SocketStatus.Finished;
- try
- {
- if(cb._timeout >= 0)
- {
- _timer.cancel(cb);
- }
-
- if(finished)
- {
- cb.socketFinished();
- }
- else
- {
- status = cb.socketReady();
- }
- }
- catch(java.lang.Exception ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in selector thread " + _thread.getName() +
- " while calling socketReady():\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- status = SocketStatus.Finished;
- }
-
- if(status != SocketStatus.Finished)
- {
- if(cb.hasMoreData())
- {
- _selector.hasMoreData(cb);
- }
-
- if(status != cb._previousStatus)
- {
- synchronized(this)
- {
- // The callback might have been finished concurrently.
- if(cb._status != SocketStatus.Finished)
- {
- _selector.update(cb, status);
- cb._status = status;
- }
- }
- }
-
- if(cb._timeout >= 0)
- {
- _timer.schedule(cb, cb._timeout);
- }
- }
- }
- }
-
- assert(_destroyed);
-
- _selector.destroy();
- }
-
- private Instance _instance;
- private boolean _destroyed;
- private Selector _selector;
- private java.util.LinkedList<SocketReadyCallback> _finished = new java.util.LinkedList<SocketReadyCallback>();
-
- private final class HelperThread extends Thread
- {
- HelperThread()
- {
- String threadName = _instance.initializationData().properties.getProperty("Ice.ProgramName");
- if(threadName.length() > 0)
- {
- threadName += "-";
- }
- setName(threadName + "Ice.SelectorThread");
- }
-
- public void
- run()
- {
- try
- {
- SelectorThread.this.run();
- }
- catch(java.lang.Exception ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in selector thread " + getName() + ":\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- }
- }
- }
-
- private HelperThread _thread;
- private Timer _timer;
-}
diff --git a/java/src/IceInternal/ServantManager.java b/java/src/IceInternal/ServantManager.java
index ddee73a6693..c50ee74b45d 100644
--- a/java/src/IceInternal/ServantManager.java
+++ b/java/src/IceInternal/ServantManager.java
@@ -288,13 +288,9 @@ public final class ServantManager
}
catch(java.lang.Exception ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
String s = "exception during locator deactivation:\n" + "object adapter: `" + _adapterName + "'\n" +
- "locator category: `" + p.getKey() + "'\n" + sw.toString();
- logger.error(s);
+ "locator category: `" + p.getKey() + "'\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
}
}
}
diff --git a/java/src/IceInternal/SocketOperation.java b/java/src/IceInternal/SocketOperation.java
new file mode 100644
index 00000000000..9a2350e1033
--- /dev/null
+++ b/java/src/IceInternal/SocketOperation.java
@@ -0,0 +1,20 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+import java.nio.channels.SelectionKey;
+
+public class SocketOperation
+{
+ public static final int None = 0;
+ public static final int Read = SelectionKey.OP_READ;
+ public static final int Write = SelectionKey.OP_WRITE;
+ public static final int Connect = SelectionKey.OP_CONNECT;
+}
diff --git a/java/src/IceInternal/SocketStatus.java b/java/src/IceInternal/SocketStatus.java
deleted file mode 100644
index 5635b7958c8..00000000000
--- a/java/src/IceInternal/SocketStatus.java
+++ /dev/null
@@ -1,39 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-public final class SocketStatus
-{
- private static SocketStatus[] _values = new SocketStatus[4];
-
- public static final int _Finished = 0;
- public static final SocketStatus Finished = new SocketStatus(_Finished);
- public static final int _NeedConnect = 1;
- public static final SocketStatus NeedConnect = new SocketStatus(_NeedConnect);
- public static final int _NeedRead = 2;
- public static final SocketStatus NeedRead = new SocketStatus(_NeedRead);
- public static final int _NeedWrite = 3;
- public static final SocketStatus NeedWrite = new SocketStatus(_NeedWrite);
-
- public int
- value()
- {
- return _value;
- }
-
- private
- SocketStatus(int val)
- {
- _value = val;
- _values[val] = this;
- }
-
- private int _value;
-}
diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java
index d322169a719..6d4a2901f5e 100644
--- a/java/src/IceInternal/TcpTransceiver.java
+++ b/java/src/IceInternal/TcpTransceiver.java
@@ -18,13 +18,13 @@ final class TcpTransceiver implements Transceiver
return _fd;
}
- public SocketStatus
+ public int
initialize()
{
if(_state == StateNeedConnect)
{
_state = StateConnectPending;
- return SocketStatus.NeedConnect;
+ return SocketOperation.Connect;
}
else if(_state <= StateConnectPending)
{
@@ -51,13 +51,13 @@ final class TcpTransceiver implements Transceiver
}
}
assert(_state == StateConnected);
- return SocketStatus.Finished;
+ return SocketOperation.None;
}
public void
close()
{
- if(_traceLevels.network >= 1)
+ if(_state == StateConnected && _traceLevels.network >= 1)
{
String s = "closing tcp connection\n" + toString();
_logger.trace(_traceLevels.networkCat, s);
@@ -85,9 +85,13 @@ final class TcpTransceiver implements Transceiver
{
final int size = buf.b.limit();
int packetSize = size - buf.b.position();
- if(_maxPacketSize > 0 && packetSize > _maxPacketSize)
+
+ //
+ // Limit packet size to avoid performance problems on WIN32
+ //
+ if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
{
- packetSize = _maxPacketSize;
+ packetSize = _maxSendPacketSize;
buf.b.limit(buf.b.position() + packetSize);
}
@@ -108,7 +112,7 @@ final class TcpTransceiver implements Transceiver
// Writing would block, so we reset the limit (if necessary) and return true to indicate
// that more data must be sent.
//
- if(packetSize == _maxPacketSize)
+ if(packetSize == _maxSendPacketSize)
{
buf.b.limit(size);
}
@@ -117,7 +121,7 @@ final class TcpTransceiver implements Transceiver
if(_traceLevels.network >= 3)
{
- String s = "sent " + ret + " of " + size + " bytes via tcp\n" + toString();
+ String s = "sent " + ret + " of " + packetSize + " bytes via tcp\n" + toString();
_logger.trace(_traceLevels.networkCat, s);
}
@@ -126,13 +130,13 @@ final class TcpTransceiver implements Transceiver
_stats.bytesSent(type(), ret);
}
- if(packetSize == _maxPacketSize)
+ if(packetSize == _maxSendPacketSize)
{
assert(buf.b.position() == buf.b.limit());
packetSize = size - buf.b.position();
- if(packetSize > _maxPacketSize)
+ if(packetSize > _maxSendPacketSize)
{
- packetSize = _maxPacketSize;
+ packetSize = _maxSendPacketSize;
}
buf.b.limit(buf.b.position() + packetSize);
}
@@ -154,11 +158,7 @@ final class TcpTransceiver implements Transceiver
public boolean
read(Buffer buf, Ice.BooleanHolder moreData)
{
- int remaining = 0;
- if(_traceLevels.network >= 3)
- {
- remaining = buf.b.remaining();
- }
+ int packetSize = buf.b.remaining();
moreData.value = false;
while(buf.b.hasRemaining())
@@ -182,7 +182,7 @@ final class TcpTransceiver implements Transceiver
{
if(_traceLevels.network >= 3)
{
- String s = "received " + ret + " of " + remaining + " bytes via tcp\n" + toString();
+ String s = "received " + ret + " of " + packetSize + " bytes via tcp\n" + toString();
_logger.trace(_traceLevels.networkCat, s);
}
@@ -191,6 +191,8 @@ final class TcpTransceiver implements Transceiver
_stats.bytesReceived(type(), ret);
}
}
+
+ packetSize = buf.b.remaining();
}
catch(java.io.InterruptedIOException ex)
{
@@ -240,7 +242,7 @@ final class TcpTransceiver implements Transceiver
_state = connected ? StateConnected : StateNeedConnect;
_desc = Network.fdToString(_fd);
- _maxPacketSize = 0;
+ _maxSendPacketSize = 0;
if(System.getProperty("os.name").startsWith("Windows"))
{
//
@@ -248,10 +250,10 @@ final class TcpTransceiver implements Transceiver
// poor throughput performances when transfering large amount of
// data. See Microsoft KB article KB823764.
//
- _maxPacketSize = Network.getSendBufferSize(_fd) / 2;
- if(_maxPacketSize < 512)
+ _maxSendPacketSize = Network.getSendBufferSize(_fd) / 2;
+ if(_maxSendPacketSize < 512)
{
- _maxPacketSize = 0;
+ _maxSendPacketSize = 0;
}
}
}
@@ -271,7 +273,7 @@ final class TcpTransceiver implements Transceiver
private Ice.Stats _stats;
private String _desc;
private int _state;
- private int _maxPacketSize;
+ private int _maxSendPacketSize;
private static final int StateNeedConnect = 0;
private static final int StateConnectPending = 1;
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 917c85182de..4494eb9396f 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -11,13 +11,62 @@ package IceInternal;
public final class ThreadPool
{
- private final static boolean TRACE_REGISTRATION = false;
- private final static boolean TRACE_INTERRUPT = false;
- private final static boolean TRACE_SHUTDOWN = false;
- private final static boolean TRACE_SELECT = false;
- private final static boolean TRACE_EXCEPTION = false;
- private final static boolean TRACE_THREAD = false;
- private final static boolean TRACE_STACK_TRACE = false;
+ final class ShutdownWorkItem implements ThreadPoolWorkItem
+ {
+ public void execute(ThreadPoolCurrent current)
+ {
+ current.ioCompleted();
+ try
+ {
+ _instance.objectAdapterFactory().shutdown();
+ }
+ catch(Ice.CommunicatorDestroyedException ex)
+ {
+ }
+ }
+ }
+
+ static final class FinishedWorkItem implements ThreadPoolWorkItem
+ {
+ public
+ FinishedWorkItem(EventHandler handler)
+ {
+ _handler = handler;
+ }
+
+ public void execute(ThreadPoolCurrent current)
+ {
+ _handler.finished(current);
+ }
+
+ private final EventHandler _handler;
+ }
+
+ static final class JoinThreadWorkItem implements ThreadPoolWorkItem
+ {
+ public
+ JoinThreadWorkItem(EventHandlerThread thread)
+ {
+ _thread = thread;
+ }
+
+ public void execute(ThreadPoolCurrent current)
+ {
+ // No call to ioCompleted, this shouldn't block (and we don't want to cause
+ // a new thread to be started).
+ _thread.join();
+ }
+
+ private final EventHandlerThread _thread;
+ }
+
+ //
+ // Exception raised by the thread pool work queue when the thread pool
+ // is destroyed.
+ //
+ static final class DestroyedException extends RuntimeException
+ {
+ }
public
ThreadPool(Instance instance, String prefix, int timeout)
@@ -25,32 +74,34 @@ public final class ThreadPool
_instance = instance;
_destroyed = false;
_prefix = prefix;
- _timeout = timeout;
- _selector = new Selector(instance, timeout);
+ _selector = new Selector(instance);
_threadIndex = 0;
- _running = 0;
_inUse = 0;
- _load = 1.0;
+ _inUseIO = 0;
_promote = true;
_serialize = _instance.initializationData().properties.getPropertyAsInt(_prefix + ".Serialize") > 0;
- _warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
+ _serverIdleTime = timeout;
+
+ Ice.Properties properties = _instance.initializationData().properties;
- String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName");
+ String programName = properties.getProperty("Ice.ProgramName");
if(programName.length() > 0)
{
- _programNamePrefix = programName + "-";
+ _threadPrefix = programName + "-" + _prefix;
}
else
{
- _programNamePrefix = "";
+ _threadPrefix = _prefix;
}
+ int nProcessors = Runtime.getRuntime().availableProcessors();
+
//
// We use just one thread as the default. This is the fastest
// possible setting, still allows one level of nesting, and
// doesn't require to make the servants thread safe.
//
- int size = _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
+ int size = properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
if(size < 1)
{
String s = _prefix + ".Size < 1; Size adjusted to 1";
@@ -58,8 +109,11 @@ public final class ThreadPool
size = 1;
}
- int sizeMax =
- _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
+ int sizeMax = properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
+ if(sizeMax == -1)
+ {
+ sizeMax = nProcessors;
+ }
if(sizeMax < size)
{
String s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")";
@@ -67,7 +121,7 @@ public final class ThreadPool
sizeMax = size;
}
- int sizeWarn = _instance.initializationData().properties.getPropertyAsInt( _prefix + ".SizeWarn");
+ int sizeWarn = properties.getPropertyAsIntWithDefault(_prefix + ".SizeWarn", sizeMax * 80 / 100);
if(sizeWarn != 0 && sizeWarn < size)
{
String s = _prefix + ".SizeWarn < " + _prefix + ".Size; adjusted SizeWarn to Size (" + size + ")";
@@ -81,28 +135,43 @@ public final class ThreadPool
sizeWarn = sizeMax;
}
+ int threadIdleTime = properties.getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60);
+ if(threadIdleTime < 0)
+ {
+ String s = _prefix + ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0";
+ _instance.initializationData().logger.warning(s);
+ threadIdleTime = 0;
+ }
+
_size = size;
_sizeMax = sizeMax;
_sizeWarn = sizeWarn;
+ _sizeIO = Math.min(sizeMax, nProcessors);
+ _threadIdleTime = threadIdleTime;
- int stackSize = _instance.initializationData().properties.getPropertyAsInt( _prefix + ".StackSize");
+ int stackSize = properties.getPropertyAsInt( _prefix + ".StackSize");
if(stackSize < 0)
{
String s = _prefix + ".StackSize < 0; Size adjusted to JRE default";
_instance.initializationData().logger.warning(s);
stackSize = 0;
}
-
_stackSize = stackSize;
- _hasPriority = _instance.initializationData().properties.getProperty(_prefix + ".ThreadPriority") != "";
- _priority = Util.getThreadPriorityProperty(_instance.initializationData().properties, _prefix);
- if(!_hasPriority)
+ boolean hasPriority = properties.getProperty(_prefix + ".ThreadPriority") != "";
+ int priority = properties.getPropertyAsInt(_prefix + ".ThreadPriority");
+ if(!hasPriority)
{
- _hasPriority = _instance.initializationData().properties.getProperty("Ice.ThreadPriority") != "";
- _priority = Util.getThreadPriorityProperty(_instance.initializationData().properties, "Ice");
+ hasPriority = properties.getProperty("Ice.ThreadPriority") != "";
+ priority = properties.getPropertyAsInt("Ice.ThreadPriority");
}
+ _hasPriority = hasPriority;
+ _priority = priority;
+
+ _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector);
+ _nextHandler = _handlers.iterator();
+
if(_instance.traceLevels().threadPool >= 1)
{
String s = "creating " + _prefix + ": Size = " + _size + ", SizeMax = " + _sizeMax + ", SizeWarn = " +
@@ -112,11 +181,9 @@ public final class ThreadPool
try
{
- _threads = new java.util.ArrayList<EventHandlerThread>();
for(int i = 0; i < _size; i++)
{
- EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
- _threadIndex++);
+ EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
_threads.add(thread);
if(_hasPriority)
{
@@ -126,16 +193,11 @@ public final class ThreadPool
{
thread.start(java.lang.Thread.NORM_PRIORITY);
}
- ++_running;
}
}
catch(RuntimeException ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
+ String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
destroy();
@@ -154,158 +216,49 @@ public final class ThreadPool
public synchronized void
destroy()
{
- if(TRACE_SHUTDOWN)
- {
- trace("destroy");
- }
-
assert(!_destroyed);
_destroyed = true;
- _selector.setInterrupt();
+ _workQueue.destroy();
}
public synchronized void
- _register(EventHandler handler)
+ initialize(EventHandler handler)
{
assert(!_destroyed);
+ _selector.initialize(handler);
+ }
- if(!handler._registered)
- {
- if(TRACE_REGISTRATION)
- {
- trace("adding handler of type " + handler.getClass().getName() + " for channel " + handler.fd());
- }
-
- if(!handler._serializing)
- {
- _selector.add(handler, SocketStatus.NeedRead);
- }
- handler._registered = true;
- }
+ public void
+ register(EventHandler handler, int op)
+ {
+ update(handler, SocketOperation.None, op);
}
public synchronized void
- unregister(EventHandler handler)
+ update(EventHandler handler, int remove, int add)
{
assert(!_destroyed);
- if(handler._registered)
- {
- if(TRACE_REGISTRATION)
- {
- trace("removing handler for channel " + handler.fd());
- }
+ _selector.update(handler, remove, add);
+ }
- if(!handler._serializing)
- {
- _selector.remove(handler);
- }
- handler._registered = false;
- }
+ public void
+ unregister(EventHandler handler, int op)
+ {
+ update(handler, op, SocketOperation.None);
}
public synchronized void
finish(EventHandler handler)
{
assert(!_destroyed);
-
- if(TRACE_REGISTRATION)
- {
- trace("finishing handler for channel " + handler.fd());
- }
-
- if(handler._registered)
- {
- if(!handler._serializing)
- {
- _selector.remove(handler);
- }
- handler._registered = false;
- }
-
- _finished.add(handler);
- _selector.setInterrupt();
- }
-
- public synchronized void
- execute(ThreadPoolWorkItem workItem)
- {
- if(_destroyed)
- {
- throw new Ice.CommunicatorDestroyedException();
- }
- _workItems.add(workItem);
- _selector.setInterrupt();
+ _selector.finish(handler);
+ _workQueue.queue(new FinishedWorkItem(handler));
}
public void
- promoteFollower(EventHandler handler)
+ execute(ThreadPoolWorkItem workItem)
{
- if(_sizeMax > 1)
- {
- synchronized(this)
- {
- if(_serialize && handler != null)
- {
- handler._serializing = true;
- if(handler._registered)
- {
- _selector.remove(handler);
- }
- }
-
- assert(!_promote);
- _promote = true;
- notify();
-
- if(!_destroyed)
- {
- assert(_inUse >= 0);
- ++_inUse;
-
- if(_inUse == _sizeWarn)
- {
- String s = "thread pool `" + _prefix + "' is running low on threads\n"
- + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
- _instance.initializationData().logger.warning(s);
- }
-
- assert(_inUse <= _running);
- if(_inUse < _sizeMax && _inUse == _running)
- {
- if(_instance.traceLevels().threadPool >= 1)
- {
- String s = "growing " + _prefix + ": Size = " + (_running + 1);
- _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
- }
-
- try
- {
- EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
- _threadIndex++);
- _threads.add(thread);
- if(_hasPriority)
- {
- thread.start(_priority);
- }
- else
- {
- thread.start(java.lang.Thread.NORM_PRIORITY);
- }
- ++_running;
- }
- catch(RuntimeException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- }
- }
- }
- }
- }
+ _workQueue.queue(workItem);
}
public void
@@ -319,12 +272,13 @@ public final class ThreadPool
//
for(EventHandlerThread thread : _threads)
{
- thread.join(true);
+ thread.join();
}
//
// Destroy the selector
//
+ _workQueue.close();
_selector.destroy();
}
@@ -334,656 +288,308 @@ public final class ThreadPool
return _prefix;
}
- //
- // Each thread supplies a BasicStream, to avoid creating excessive
- // garbage (Java only).
- //
- private boolean
- run(BasicStream stream)
+ private void
+ run(EventHandlerThread thread)
{
- if(_sizeMax > 1)
+ ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this);
+ boolean select = false;
+ while(true)
{
- synchronized(this)
+ if(current._handler != null)
{
- while(!_promote)
+ try
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ current._handler.message(current);
+ }
+ catch(DestroyedException ex)
+ {
+ return;
+ }
+ catch(java.lang.Exception ex)
+ {
+ String s = "exception in `" + _prefix + "':\n" + Ex.toString(ex);
+ s += "\nevent handler: " + current._handler.toString();
+ _instance.initializationData().logger.error(s);
}
-
- _promote = false;
- }
-
- if(TRACE_THREAD)
- {
- trace("thread " + Thread.currentThread() + " has the lock");
- }
- }
-
- while(true)
- {
- try
- {
- _selector.select();
- }
- catch(java.io.IOException ex)
- {
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- //throw se;
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- se.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- continue;
}
-
- EventHandler handler = null;
- ThreadPoolWorkItem workItem = null;
- boolean finished = false;
- boolean shutdown = false;
-
- synchronized(this)
+ else if(select)
{
- if(_selector.checkTimeout())
+ try
{
- assert(_timeout > 0);
- shutdown = true;
+ _selector.select(_serverIdleTime);
}
- else if(_selector.isInterrupted())
+ catch(Selector.TimeoutException ex)
{
- if(_selector.processInterrupt())
+ synchronized(this)
{
+ if(!_destroyed && _inUse == 0)
+ {
+ _workQueue.queue(new ShutdownWorkItem()); // Select timed-out.
+ }
continue;
}
-
- //
- // There are three possiblities for an interrupt:
- //
- // 1. The thread pool has been destroyed.
- //
- // 2. An event handler is being finished.
- //
- // 3. A work item has been scheduled.
- //
-
- if(!_finished.isEmpty())
+ }
+ }
+
+ synchronized(this)
+ {
+ if(current._handler == null)
+ {
+ if(select)
{
- _selector.clearInterrupt();
- handler = _finished.removeFirst();
- finished = true;
+ _selector.finishSelect(_handlers, _serverIdleTime);
+ _nextHandler = _handlers.iterator();
+ select = false;
}
- else if(!_workItems.isEmpty())
+ else if(!current._leader && followerWait(thread, current))
{
- //
- // Work items must be executed first even if the thread pool is destroyed.
- //
- _selector.clearInterrupt();
- workItem = _workItems.removeFirst();
+ return; // Wait timed-out.
}
- else if(_destroyed)
+ }
+ else if(_sizeMax > 1)
+ {
+ if(!current._ioCompleted)
{
//
- // Don't clear the interrupt if destroyed, so that the other threads exit as well.
+ // The handler didn't call ioCompleted() so we take care of decreasing
+ // the IO thread count now.
//
- return true;
+ --_inUseIO;
+
+ if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
+ {
+ _selector.hasMoreData(current._handler);
+ }
}
else
{
- assert(false);
+ //
+ // If the handler called ioCompleted(), we re-enable the handler in
+ // case it was disabled and we decrease the number of thread in use.
+ //
+ _selector.enable(current._handler, current.operation);
+ assert(_inUse > 0);
+ --_inUse;
}
- }
- else
- {
- handler = (EventHandler)_selector.getNextSelected();
- if(handler == null)
+
+ if(!current._leader && followerWait(thread, current))
{
- continue;
+ return; // Wait timed-out.
}
}
- }
-
- //
- // Now we are outside the thread synchronization.
- //
-
- if(shutdown)
- {
- //
- // Initiate server shutdown.
- //
- ObjectAdapterFactory factory;
- try
- {
- factory = _instance.objectAdapterFactory();
- }
- catch(Ice.CommunicatorDestroyedException e)
+ else if(!current._ioCompleted &&
+ (current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
{
- continue;
+ _selector.hasMoreData(current._handler);
}
- promoteFollower(null);
- factory.shutdown();
-
//
- // No "continue", because we want shutdown to be done in
- // its own thread from this pool. Therefore we called
- // promoteFollower().
+ // Get the next ready handler.
//
- }
- else if(workItem != null)
- {
- try
+ if(_nextHandler.hasNext())
{
- workItem.execute(this);
+ current._ioCompleted = false;
+ current._handler = _nextHandler.next();
+ current.operation = current._handler._ready;
}
- catch(Ice.LocalException ex)
+ else
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' while calling execute():\n" + sw.toString();
- _instance.initializationData().logger.error(s);
+ current._handler = null;
}
- //
- // No "continue", because we want execute() to
- // be called in its own thread from this
- // pool. Note that this means that execute()
- // must call promoteFollower().
- //
- }
- else
- {
- assert(handler != null);
-
- if(finished)
+ if(current._handler == null)
{
//
- // Notify a handler about its removal from the thread pool.
+ // If there are no more ready handlers and there are still threads busy performing
+ // IO, we give up leadership and promote another follower (which will perform the
+ // select() only once all the IOs are completed). Otherwise, if there's no more
+ // threads peforming IOs, it's time to do another select().
//
- try
+ if(_inUseIO > 0)
{
- handler.finished(this);
+ promoteFollower(current);
}
- catch(Ice.LocalException ex)
+ else
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' while calling finished():\n" +
- sw.toString() + "\n" + handler.toString();
- _instance.initializationData().logger.error(s);
+ _selector.startSelect();
+ select = true;
}
-
- //
- // No "continue", because we want finished() to be
- // called in its own thread from this pool. Note
- // that this means that finished() must call
- // promoteFollower().
- //
}
- else
+ else if(_sizeMax > 1)
{
//
- // If the handler is "readable", try to read a
- // message.
+ // Increment the IO thread count and if there's still threads available
+ // to perform IO and more handlers ready, we promote a follower.
//
- try
- {
- if(handler.readable())
- {
- try
- {
- if(!read(handler))
- {
- continue; // Can't read without blocking.
- }
-
- if(handler.hasMoreData())
- {
- _selector.hasMoreData(handler);
- }
- }
- catch(Ice.TimeoutException ex)
- {
- assert(false); // This shouldn't occur as we only perform non-blocking reads.
- continue;
- }
- catch(Ice.DatagramLimitException ex) // Expected.
- {
- handler._stream.resize(0, true);
- continue;
- }
- catch(Ice.SocketException ex)
- {
- if(TRACE_EXCEPTION)
- {
- trace("informing handler (" + handler.getClass().getName() +
- ") about exception " + ex);
- ex.printStackTrace();
- }
-
- handler.exception(ex);
- continue;
- }
- catch(Ice.LocalException ex)
- {
- if(handler.datagram())
- {
- if(_instance.initializationData().properties.getPropertyAsInt(
- "Ice.Warn.Connections") > 0)
- {
- _instance.initializationData().logger.warning(
- "datagram connection exception:\n" + ex + "\n" + handler.toString());
- }
- handler._stream.resize(0, true);
- }
- else
- {
- if(TRACE_EXCEPTION)
- {
- trace("informing handler (" + handler.getClass().getName() +
- ") about exception " + ex);
- ex.printStackTrace();
- }
-
- handler.exception(ex);
- }
- continue;
- }
-
- stream.swap(handler._stream);
- assert(stream.pos() == stream.size());
- }
-
- //
- // Provide a new message to the handler.
- //
- try
- {
- handler.message(stream, this);
- }
- catch(Ice.LocalException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' while calling message():\n" +
- sw.toString() + "\n" + handler.toString();
- _instance.initializationData().logger.error(s);
- }
-
- //
- // No "continue", because we want message() to
- // be called in its own thread from this
- // pool. Note that this means that message()
- // must call promoteFollower().
- //
- }
- finally
+ ++_inUseIO;
+ if(_nextHandler.hasNext() && _inUseIO < _sizeIO)
{
- stream.reset();
+ promoteFollower(current);
}
}
}
+ }
+ }
+
+ void
+ ioCompleted(ThreadPoolCurrent current)
+ {
+ current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called.
- if(_sizeMax > 1)
+ if(_sizeMax > 1)
+ {
+ synchronized(this)
{
- synchronized(this)
+ --_inUseIO;
+
+ if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
{
- if(!_destroyed)
- {
- if(_serialize && handler != null && handler._serializing)
- {
- if(handler._registered)
- {
- _selector.add(handler, SocketStatus.NeedRead);
- }
- handler._serializing = false;
- }
+ _selector.hasMoreData(current._handler);
+ }
+ if(_serialize && !_destroyed)
+ {
+ _selector.disable(current._handler, current.operation);
+ }
+
+ if(current._leader)
+ {
+ //
+ // If this thread is still the leader, it's time to promote a new leader.
+ //
+ promoteFollower(current);
+ }
+ else if(_promote && (_nextHandler.hasNext() || _inUseIO == 0))
+ {
+ notify();
+ }
- if(_size < _sizeMax) // Dynamic thread pool
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
+ {
+ String s = "thread pool `" + _prefix + "' is running low on threads\n"
+ + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
+ _instance.initializationData().logger.warning(s);
+ }
+
+ if(!_destroyed)
+ {
+ assert(_inUse <= _threads.size());
+ if(_inUse < _sizeMax && _inUse == _threads.size())
+ {
+ if(_instance.traceLevels().threadPool >= 1)
{
- //
- // First we reap threads that have been
- // destroyed before.
- //
- int sz = _threads.size();
- assert(_running <= sz);
- if(_running < sz)
- {
- java.util.Iterator<EventHandlerThread> i = _threads.iterator();
- while(i.hasNext())
- {
- EventHandlerThread thread = i.next();
-
- if(!thread.isAlive())
- {
- if(thread.join(false))
- {
- i.remove();
- }
- }
- }
- }
+ String s = "growing " + _prefix + ": Size=" + (_threads.size() + 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
- //
- // Now we check if this thread can be destroyed, based
- // on a load factor.
- //
-
- //
- // The load factor jumps immediately to the number of
- // threads that are currently in use, but decays
- // exponentially if the number of threads in use is
- // smaller than the load factor. This reflects that we
- // create threads immediately when they are needed,
- // but want the number of threads to slowly decline to
- // the configured minimum.
- //
- double inUse = (double)_inUse;
- if(_load < inUse)
+ try
+ {
+ EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
+ _threads.add(thread);
+ if(_hasPriority)
{
- _load = inUse;
+ thread.start(_priority);
}
else
{
- final double loadFactor = 0.05; // TODO: Configurable?
- final double oneMinusLoadFactor = 1 - loadFactor;
- _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
- }
-
- if(_running > _size)
- {
- int load = (int)(_load + 0.5);
-
- //
- // We add one to the load factor because one
- // additional thread is needed for select().
- //
- if(load + 1 < _running)
- {
- if(_instance.traceLevels().threadPool >= 1)
- {
- String s = "shrinking " + _prefix + ": Size = " + (_running - 1);
- _instance.initializationData().logger.trace(
- _instance.traceLevels().threadPoolCat, s);
- }
-
- assert(_inUse > 0);
- --_inUse;
-
- assert(_running > 0);
- --_running;
-
- return false;
- }
+ thread.start(java.lang.Thread.NORM_PRIORITY);
}
}
-
- assert(_inUse > 0);
- --_inUse;
- }
-
- //
- // Do not wait to be promoted again to release these objects.
- //
- handler = null;
- workItem = null;
-
- while(!_promote)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
+ catch(RuntimeException ex)
{
+ String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
}
}
-
- _promote = false;
- }
-
- if(TRACE_THREAD)
- {
- trace("thread " + Thread.currentThread() + " has the lock");
}
}
}
- }
-
- private boolean
- read(EventHandler handler)
- {
- BasicStream stream = handler._stream;
-
- if(stream.pos() >= Protocol.headerSize)
- {
- if(!handler.read(stream))
- {
- return false;
- }
- assert(stream.pos() == stream.size());
- return true;
- }
-
- if(stream.size() == 0)
+ else if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
{
- stream.resize(Protocol.headerSize, true);
- stream.pos(0);
- }
-
- if(stream.pos() != stream.size())
- {
- if(!handler.read(stream))
+ synchronized(this)
{
- return false;
+ _selector.hasMoreData(current._handler);
}
- assert(stream.pos() == stream.size());
- }
-
- int pos = stream.pos();
- if(pos < Protocol.headerSize)
- {
- //
- // This situation is possible for small UDP packets.
- //
- throw new Ice.IllegalMessageSizeException();
- }
- stream.pos(0);
- byte[] m = new byte[4];
- m[0] = stream.readByte();
- m[1] = stream.readByte();
- m[2] = stream.readByte();
- m[3] = stream.readByte();
- if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1]
- || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
- {
- Ice.BadMagicException ex = new Ice.BadMagicException();
- ex.badMagic = m;
- throw ex;
}
-
- byte pMajor = stream.readByte();
- byte pMinor = stream.readByte();
- if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor)
- {
- Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException();
- e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
- e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
- e.major = Protocol.protocolMajor;
- e.minor = Protocol.protocolMinor;
- throw e;
- }
-
- byte eMajor = stream.readByte();
- byte eMinor = stream.readByte();
- if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor)
- {
- Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException();
- e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
- e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
- e.major = Protocol.encodingMajor;
- e.minor = Protocol.encodingMinor;
- throw e;
- }
-
- stream.readByte(); // messageType
- stream.readByte(); // compress
- int size = stream.readInt();
- if(size < Protocol.headerSize)
- {
- throw new Ice.IllegalMessageSizeException();
- }
- if(size > _instance.messageSizeMax())
- {
- Ex.throwMemoryLimitException(size, _instance.messageSizeMax());
- }
- if(size > stream.size())
- {
- stream.resize(size, true);
- }
- stream.pos(pos);
-
- if(stream.pos() != stream.size())
+ }
+
+ private synchronized void
+ promoteFollower(ThreadPoolCurrent current)
+ {
+ assert(!_promote && current._leader);
+ _promote = true;
+ if(_inUseIO < _sizeIO && (_nextHandler.hasNext() || _inUseIO == 0))
{
- if(handler.datagram())
- {
- if(_warnUdp)
- {
- _instance.initializationData().logger.warning("DatagramLimitException: maximum size of "
- + stream.pos() + " exceeded");
- }
- throw new Ice.DatagramLimitException();
- }
- else
- {
- if(!handler.read(stream))
- {
- return false;
- }
- assert(stream.pos() == stream.size());
- }
+ notify();
}
-
- return true;
+ current._leader = false;
}
-/*
- * Commented out because it is unused.
- *
- private void
- selectNonBlocking()
+ private synchronized boolean
+ followerWait(EventHandlerThread thread, ThreadPoolCurrent current)
{
- while(true)
+ assert(!current._leader);
+
+ //
+ // It's important to clear the handler before waiting to make sure that
+ // resources for the handler are released now if it's finished. We also
+ // clear the per-thread stream.
+ //
+ current._handler = null;
+ current.stream.reset();
+
+ //
+ // Wait to be promoted and for all the IO threads to be done.
+ //
+ while(!_promote || _inUseIO == _sizeIO || _nextHandler.hasNext() && _inUseIO > 0)
{
- try
+ while(true)
{
- if(TRACE_SELECT)
- {
- trace("non-blocking select on " + _selector.keys().size() + " keys, thread id = " +
- Thread.currentThread());
- }
-
- _selector.selectNow();
-
- if(TRACE_SELECT)
+ try
{
- if(_keys.size() > 0)
+ if(_threadIdleTime > 0)
{
- trace("after selectNow, there are " + _keys.size() + " selected keys:");
- for(java.nio.channels.SelectionKey key : _keys)
+ long before = IceInternal.Time.currentMonotonicTimeMillis();
+ wait(_threadIdleTime * 1000);
+ if(IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000)
{
- trace(" " + keyToString(key));
+ if(_instance.traceLevels().threadPool >= 1)
+ {
+ String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
+ assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
+ _threads.remove(thread);
+ _workQueue.queue(new JoinThreadWorkItem(thread));
+ return true;
}
}
- }
+ else
+ {
+ wait();
+ }
- break;
- }
- catch(java.io.InterruptedIOException ex)
- {
- continue;
- }
- catch(java.io.IOException ex)
- {
- //
- // Pressing Ctrl-C causes select() to raise an
- // IOException, which seems like a JDK bug. We trap
- // for that special case here and ignore it.
- // Hopefully we're not masking something important!
- //
- if(ex.getMessage().indexOf("Interrupted system call") != -1)
+ break;
+ }
+ catch(InterruptedException ex)
{
- continue;
}
-
- Ice.SocketException se = new Ice.SocketException();
- se.initCause(ex);
- //throw se;
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- se.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "':\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- continue;
}
}
- }
-*/
-
- private void
- trace(String msg)
- {
- System.err.println(_prefix + ": " + msg);
- }
-
- private String
- keyToString(java.nio.channels.SelectionKey key)
- {
- String ops = "[";
- if(key.isAcceptable())
- {
- ops += " OP_ACCEPT";
- }
- if(key.isReadable())
- {
- ops += " OP_READ";
- }
- if(key.isConnectable())
- {
- ops += " OP_CONNECT";
- }
- if(key.isWritable())
- {
- ops += " OP_WRITE";
- }
- ops += " ]";
- return key.channel() + " " + ops;
+ current._leader = true; // The current thread has become the leader.
+ _promote = false;
+ return false;
}
- private Instance _instance;
+ private final Instance _instance;
+ private final ThreadPoolWorkQueue _workQueue;
private boolean _destroyed;
private final String _prefix;
- private final String _programNamePrefix;
+ private final String _threadPrefix;
private final Selector _selector;
- private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>();
- private java.util.LinkedList<EventHandler> _finished = new java.util.LinkedList<EventHandler>();
- private int _timeout;
private final class EventHandlerThread implements Runnable
{
@@ -992,28 +598,18 @@ public final class ThreadPool
_name = name;
}
- public boolean
- isAlive()
- {
- return _thread.isAlive();
- }
-
- public boolean
- join(boolean wait)
+ public void
+ join()
{
while(true)
{
try
{
_thread.join();
- return true;
+ break;
}
catch(InterruptedException ex)
{
- if(!wait)
- {
- return false;
- }
}
}
}
@@ -1031,73 +627,65 @@ public final class ThreadPool
{
if(_instance.initializationData().threadHook != null)
{
- _instance.initializationData().threadHook.start();
+ try
+ {
+ _instance.initializationData().threadHook.start();
+ }
+ catch(java.lang.Exception ex)
+ {
+ String s = "thread hook start() method raised an unexpected exception in `";
+ s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
+ }
}
- BasicStream stream = new BasicStream(_instance);
-
- boolean promote;
-
try
{
- promote = ThreadPool.this.run(stream);
+ ThreadPool.this.run(this);
}
catch(java.lang.Exception ex)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "exception in `" + _prefix + "' thread " + _name + ":\n" + sw.toString();
+ String s = "exception in `" + _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
- promote = true;
}
- if(promote && _sizeMax > 1)
+ if(_instance.initializationData().threadHook != null)
{
- //
- // Promote a follower, but w/o modifying _inUse or
- // creating new threads.
- //
- synchronized(ThreadPool.this)
+ try
{
- assert(!_promote);
- _promote = true;
- ThreadPool.this.notify();
+ _instance.initializationData().threadHook.stop();
+ }
+ catch(java.lang.Exception ex)
+ {
+ String s = "thread hook stop() method raised an unexpected exception in `";
+ s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
}
- }
-
- if(TRACE_THREAD)
- {
- trace("run() terminated");
- }
-
- if(_instance.initializationData().threadHook != null)
- {
- _instance.initializationData().threadHook.stop();
}
}
- private String _name;
+ final private String _name;
private Thread _thread;
}
private final int _size; // Number of threads that are pre-created.
+ private final int _sizeIO; // Number of threads that can concurrently perform IO.
private final int _sizeMax; // Maximum number of threads.
private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
private final boolean _serialize; // True if requests need to be serialized over the connection.
-
+ private final int _priority;
+ private final boolean _hasPriority;
+ private final long _serverIdleTime;
+ private final long _threadIdleTime;
private final int _stackSize;
- private java.util.List<EventHandlerThread> _threads; // All threads, running or not.
+ private java.util.List<EventHandlerThread> _threads = new java.util.ArrayList<EventHandlerThread>();
private int _threadIndex; // For assigning thread names.
- private int _running; // Number of running threads.
private int _inUse; // Number of threads that are currently in use.
- private double _load; // Current load in number of threads.
+ private int _inUseIO; // Number of threads that are currently performing IO.
- private boolean _promote;
+ private java.util.List<EventHandler> _handlers = new java.util.ArrayList<EventHandler>();
+ private java.util.Iterator<EventHandler> _nextHandler;
- private final boolean _warnUdp;
- private int _priority;
- private boolean _hasPriority = false;
+ private boolean _promote;
}
diff --git a/java/src/IceInternal/ThreadPoolCurrent.java b/java/src/IceInternal/ThreadPoolCurrent.java
new file mode 100644
index 00000000000..6e12af421ad
--- /dev/null
+++ b/java/src/IceInternal/ThreadPoolCurrent.java
@@ -0,0 +1,37 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+public final class ThreadPoolCurrent
+{
+ ThreadPoolCurrent(Instance instance, ThreadPool threadPool)
+ {
+ operation = SocketOperation.None;
+ stream = new BasicStream(instance);
+
+ _threadPool = threadPool;
+ _ioCompleted = false;
+ _leader = false;
+ }
+
+ public int operation;
+ public BasicStream stream; // A per-thread stream to be used by event handlers for optimization.
+
+ public void
+ ioCompleted()
+ {
+ _threadPool.ioCompleted(this);
+ }
+
+ final ThreadPool _threadPool;
+ EventHandler _handler;
+ boolean _ioCompleted;
+ boolean _leader;
+}
diff --git a/java/src/IceInternal/ThreadPoolWorkItem.java b/java/src/IceInternal/ThreadPoolWorkItem.java
index f45cf1af4bc..3aef961990f 100644
--- a/java/src/IceInternal/ThreadPoolWorkItem.java
+++ b/java/src/IceInternal/ThreadPoolWorkItem.java
@@ -11,5 +11,5 @@ package IceInternal;
public interface ThreadPoolWorkItem
{
- void execute(ThreadPool threadPool);
+ void execute(ThreadPoolCurrent current);
}
diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java
new file mode 100644
index 00000000000..3157cd8a6d0
--- /dev/null
+++ b/java/src/IceInternal/ThreadPoolWorkQueue.java
@@ -0,0 +1,182 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+final class ThreadPoolWorkQueue extends EventHandler
+{
+ ThreadPoolWorkQueue(ThreadPool threadPool, Instance instance, Selector selector)
+ {
+ _threadPool = threadPool;
+ _instance = instance;
+ _selector = selector;
+ _destroyed = false;
+
+ Network.SocketPair pair = Network.createPipe();
+ _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
+ _fdIntrWrite = pair.sink;
+ try
+ {
+ pair.source.configureBlocking(false);
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SyscallException sys = new Ice.SyscallException();
+ sys.initCause(ex);
+ throw sys;
+ }
+
+ _selector.initialize(this);
+ _selector.update(this, SocketOperation.None, SocketOperation.Read);
+ }
+
+ protected synchronized void
+ finalize()
+ throws Throwable
+ {
+ IceUtilInternal.Assert.FinalizerAssert(_destroyed);
+ }
+
+ public synchronized void
+ close()
+ {
+ try
+ {
+ _fdIntrWrite.close();
+ }
+ catch(java.io.IOException ex)
+ {
+ }
+ _fdIntrWrite = null;
+
+ try
+ {
+ _fdIntrRead.close();
+ }
+ catch(java.io.IOException ex)
+ {
+ }
+ _fdIntrRead = null;
+ }
+
+ public synchronized
+ void destroy()
+ {
+ assert(!_destroyed);
+ _destroyed = true;
+ postMessage();
+ }
+
+ public synchronized void
+ queue(ThreadPoolWorkItem item)
+ {
+ if(_destroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+ _workItems.add(item);
+ postMessage();
+ }
+
+ public void
+ message(ThreadPoolCurrent current)
+ {
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
+ try
+ {
+ buf.rewind();
+ int ret = _fdIntrRead.read(buf);
+ assert(ret > 0);
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
+
+ ThreadPoolWorkItem workItem = null;
+ synchronized(this)
+ {
+ if(!_workItems.isEmpty())
+ {
+ workItem = _workItems.removeFirst();
+ }
+ else
+ {
+ assert(_destroyed);
+ postMessage();
+ }
+ }
+
+ if(workItem != null)
+ {
+ workItem.execute(current);
+ }
+ else
+ {
+ _threadPool.ioCompleted(current);
+ throw new ThreadPool.DestroyedException();
+ }
+ }
+
+ public void
+ finished(ThreadPoolCurrent current)
+ {
+ assert(false);
+ }
+
+ public String
+ toString()
+ {
+ return "work queue";
+ }
+
+ public java.nio.channels.SelectableChannel
+ fd()
+ {
+ return (java.nio.channels.SelectableChannel)_fdIntrRead;
+ }
+
+ public boolean
+ hasMoreData()
+ {
+ return false;
+ }
+
+ public void
+ postMessage()
+ {
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
+ buf.put(0, (byte)0);
+ while(buf.hasRemaining())
+ {
+ try
+ {
+ _fdIntrWrite.write(buf);
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
+ }
+ }
+
+ private final ThreadPool _threadPool;
+ private final Instance _instance;
+ private final Selector _selector;
+ boolean _destroyed;
+
+ private java.nio.channels.ReadableByteChannel _fdIntrRead;
+ private java.nio.channels.WritableByteChannel _fdIntrWrite;
+
+ private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>();
+} \ No newline at end of file
diff --git a/java/src/IceInternal/Timer.java b/java/src/IceInternal/Timer.java
index c7840bd339a..d907dd827b9 100644
--- a/java/src/IceInternal/Timer.java
+++ b/java/src/IceInternal/Timer.java
@@ -235,11 +235,7 @@ public final class Timer extends Thread
{
if(_instance != null)
{
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "unexpected exception from task run method in timer thread:\n" + sw.toString();
+ String s = "unexpected exception from task run method in timer thread:\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
}
}
diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java
index ca3f028dc73..6cecdd4cab5 100644
--- a/java/src/IceInternal/Transceiver.java
+++ b/java/src/IceInternal/Transceiver.java
@@ -16,15 +16,9 @@ public interface Transceiver
//
// Initialize the transceiver.
//
- // Returns the status if the initialize operation. If timeout is != 0,
- // the status will always be SocketStatus.Finished. If timeout is 0,
- // the operation won't block and will return SocketStatus.NeedRead or
- // SocketStatus.NeedWrite if the initialization couldn't be completed
- // without blocking. This operation should be called again once the
- // socket is ready for reading or writing and until it returns
- // SocketStatus.Finished.
- //
- SocketStatus initialize();
+ // Returns the status if the initialize operation.
+ //
+ int initialize();
void close();
diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java
index 1dec5fde575..8a5cd336ad8 100644
--- a/java/src/IceInternal/UdpTransceiver.java
+++ b/java/src/IceInternal/UdpTransceiver.java
@@ -18,13 +18,13 @@ final class UdpTransceiver implements Transceiver
return _fd;
}
- public SocketStatus
+ public int
initialize()
{
//
// Nothing to do.
//
- return SocketStatus.Finished;
+ return SocketOperation.None;
}
public void
@@ -32,7 +32,7 @@ final class UdpTransceiver implements Transceiver
{
assert(_fd != null);
- if(_traceLevels.network >= 1)
+ if(!_connect && _traceLevels.network >= 1)
{
String s = "closing udp connection\n" + toString();
_logger.trace(_traceLevels.networkCat, s);
diff --git a/java/src/IceSSL/TransceiverI.java b/java/src/IceSSL/TransceiverI.java
index e1e7e099cc3..60a434e7719 100644
--- a/java/src/IceSSL/TransceiverI.java
+++ b/java/src/IceSSL/TransceiverI.java
@@ -22,7 +22,7 @@ final class TransceiverI implements IceInternal.Transceiver
return _fd;
}
- public IceInternal.SocketStatus
+ public int
initialize()
{
try
@@ -30,7 +30,7 @@ final class TransceiverI implements IceInternal.Transceiver
if(_state == StateNeedConnect)
{
_state = StateConnectPending;
- return IceInternal.SocketStatus.NeedConnect;
+ return IceInternal.SocketOperation.Connect;
}
else if(_state <= StateConnectPending)
{
@@ -40,8 +40,8 @@ final class TransceiverI implements IceInternal.Transceiver
}
assert(_state == StateConnected);
- IceInternal.SocketStatus status = handshakeNonBlocking();
- if(status != IceInternal.SocketStatus.Finished)
+ int status = handshakeNonBlocking();
+ if(status != IceInternal.SocketOperation.None)
{
return status;
}
@@ -56,13 +56,13 @@ final class TransceiverI implements IceInternal.Transceiver
throw ex;
}
- return IceInternal.SocketStatus.Finished;
+ return IceInternal.SocketOperation.None;
}
public void
close()
{
- if(_instance.networkTraceLevel() >= 1)
+ if(_state == StateHandshakeComplete && _instance.networkTraceLevel() >= 1)
{
String s = "closing ssl connection\n" + toString();
_logger.trace(_instance.networkTraceCategory(), s);
@@ -147,10 +147,10 @@ final class TransceiverI implements IceInternal.Transceiver
throw new Ice.ConnectionLostException();
}
- IceInternal.SocketStatus status = writeNonBlocking(buf.b);
- if(status != IceInternal.SocketStatus.Finished)
+ int status = writeNonBlocking(buf.b);
+ if(status != IceInternal.SocketOperation.None)
{
- assert(status == IceInternal.SocketStatus.NeedWrite);
+ assert(status == IceInternal.SocketOperation.Write);
return false;
}
return true;
@@ -213,10 +213,10 @@ final class TransceiverI implements IceInternal.Transceiver
}
case BUFFER_UNDERFLOW:
{
- IceInternal.SocketStatus status = readNonBlocking();
- if(status != IceInternal.SocketStatus.Finished)
+ int status = readNonBlocking();
+ if(status != IceInternal.SocketOperation.None)
{
- assert(status == IceInternal.SocketStatus.NeedRead);
+ assert(status == IceInternal.SocketOperation.Read);
moreData.value = false;
return false;
}
@@ -345,7 +345,7 @@ final class TransceiverI implements IceInternal.Transceiver
super.finalize();
}
- private IceInternal.SocketStatus
+ private int
handshakeNonBlocking()
{
try
@@ -391,8 +391,8 @@ final class TransceiverI implements IceInternal.Transceiver
case BUFFER_UNDERFLOW:
{
assert(status == javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
- IceInternal.SocketStatus ss = readNonBlocking();
- if(ss != IceInternal.SocketStatus.Finished)
+ int ss = readNonBlocking();
+ if(ss != IceInternal.SocketOperation.None)
{
return ss;
}
@@ -413,8 +413,8 @@ final class TransceiverI implements IceInternal.Transceiver
result = _engine.wrap(_emptyBuffer, _netOutput);
if(result.bytesProduced() > 0)
{
- IceInternal.SocketStatus ss = flushNonBlocking();
- if(ss != IceInternal.SocketStatus.Finished)
+ int ss = flushNonBlocking();
+ if(ss != IceInternal.SocketOperation.None)
{
return ss;
}
@@ -457,7 +457,7 @@ final class TransceiverI implements IceInternal.Transceiver
throw e;
}
- return IceInternal.SocketStatus.Finished;
+ return IceInternal.SocketOperation.None;
}
private void
@@ -515,7 +515,7 @@ final class TransceiverI implements IceInternal.Transceiver
}
}
- private IceInternal.SocketStatus
+ private int
writeNonBlocking(ByteBuffer buf)
{
//
@@ -578,10 +578,10 @@ final class TransceiverI implements IceInternal.Transceiver
//
if(_netOutput.position() > 0)
{
- IceInternal.SocketStatus ss = flushNonBlocking();
- if(ss != IceInternal.SocketStatus.Finished)
+ int ss = flushNonBlocking();
+ if(ss != IceInternal.SocketOperation.None)
{
- assert(ss == IceInternal.SocketStatus.NeedWrite);
+ assert(ss == IceInternal.SocketOperation.Write);
return ss;
}
}
@@ -596,10 +596,10 @@ final class TransceiverI implements IceInternal.Transceiver
}
assert(_netOutput.position() == 0);
- return IceInternal.SocketStatus.Finished;
+ return IceInternal.SocketOperation.None;
}
- private IceInternal.SocketStatus
+ private int
flushNonBlocking()
{
_netOutput.flip();
@@ -612,7 +612,7 @@ final class TransceiverI implements IceInternal.Transceiver
_netOutput.limit(_netOutput.position() + packetSize);
}
- IceInternal.SocketStatus status = IceInternal.SocketStatus.Finished;
+ int status = IceInternal.SocketOperation.None;
while(_netOutput.hasRemaining())
{
try
@@ -626,7 +626,7 @@ final class TransceiverI implements IceInternal.Transceiver
}
else if(ret == 0)
{
- status = IceInternal.SocketStatus.NeedWrite;
+ status = IceInternal.SocketOperation.Write;
break;
}
@@ -653,7 +653,7 @@ final class TransceiverI implements IceInternal.Transceiver
}
}
- if(status == IceInternal.SocketStatus.Finished)
+ if(status == IceInternal.SocketOperation.None)
{
_netOutput.clear();
}
@@ -666,7 +666,7 @@ final class TransceiverI implements IceInternal.Transceiver
return status;
}
- private IceInternal.SocketStatus
+ private int
readNonBlocking()
{
while(true)
@@ -682,7 +682,7 @@ final class TransceiverI implements IceInternal.Transceiver
}
else if(ret == 0)
{
- return IceInternal.SocketStatus.NeedRead;
+ return IceInternal.SocketOperation.Read;
}
break;
@@ -699,7 +699,7 @@ final class TransceiverI implements IceInternal.Transceiver
}
}
- return IceInternal.SocketStatus.Finished;
+ return IceInternal.SocketOperation.None;
}
private void