diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
commit | b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch) | |
tree | 183215e2dbeadfbc871b800ce09726e58af38b91 /java/src | |
parent | adding compression cookbook demo (diff) | |
download | ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2 ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip |
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'java/src')
30 files changed, 1639 insertions, 2363 deletions
diff --git a/java/src/Ice/Application.java b/java/src/Ice/Application.java index 3bb0cb74205..01b42196f8c 100644 --- a/java/src/Ice/Application.java +++ b/java/src/Ice/Application.java @@ -106,12 +106,12 @@ public abstract class Application } catch(LocalException ex) { - error("", ex); + Util.getProcessLogger().error(IceInternal.Ex.toString(ex)); return 1; } catch(java.lang.Exception ex) { - error("unknown exception", ex); + Util.getProcessLogger().error("unknown exception: " + IceInternal.Ex.toString(ex)); return 1; } } @@ -195,12 +195,12 @@ public abstract class Application } catch(LocalException ex) { - error("", ex); + Util.getProcessLogger().error(IceInternal.Ex.toString(ex)); status = 1; } catch(java.lang.Exception ex) { - error("unknown exception", ex); + Util.getProcessLogger().error("unknown exception: " + IceInternal.Ex.toString(ex)); status = 1; } catch(java.lang.Error err) @@ -208,7 +208,7 @@ public abstract class Application // // We catch Error to avoid hangs in some non-fatal situations // - error("Java error", err); + Util.getProcessLogger().error("Java error: " + IceInternal.Ex.toString(err)); status = 1; } @@ -254,12 +254,12 @@ public abstract class Application } catch(LocalException ex) { - error("", ex); + Util.getProcessLogger().error(IceInternal.Ex.toString(ex)); status = 1; } catch(java.lang.Exception ex) { - error("unknown exception", ex); + Util.getProcessLogger().error("unknown exception: " + IceInternal.Ex.toString(ex)); status = 1; } _communicator = null; @@ -642,23 +642,6 @@ public abstract class Application private Thread _hook; } - private void - error(String msg, java.lang.Throwable ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - if(msg.equals("")) - { - Util.getProcessLogger().error(sw.toString()); - } - else - { - Util.getProcessLogger().error(msg + ":\n" + sw.toString()); - } - } - private static String _appName; private static Communicator _communicator; private static AppHook _appHook; diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 4107621f428..d00ce4499a6 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -17,6 +17,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne void connectionStartFailed(ConnectionI connection, Ice.LocalException ex); } + private class TimeoutCallback implements IceInternal.TimerTask + { + public void + runTimerTask() + { + timedOut(); + } + } + public void start(StartCallback callback) { @@ -24,44 +33,23 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { synchronized(this) { - if(_state == StateClosed) // The connection might already be closed if the communicator was destroyed. + if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed. { assert(_exception != null); throw _exception; } - IceInternal.SocketStatus status = initialize(); - if(status == IceInternal.SocketStatus.Finished) + if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None)) { - status = validate(); - } - - if(status != IceInternal.SocketStatus.Finished) - { - // - // If the initialization or validation couldn't be completed without potentially - // blocking, we register the connection with the selector thread and return. - // - int timeout; - IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideConnectTimeout) - { - timeout = defaultsAndOverrides.overrideConnectTimeoutValue; - } - else - { - timeout = _endpoint.timeout(); - } - - _sendInProgress = true; - _selectorThread._register(_socketReadyCallback, status, timeout); - if(callback != null) { _startCallback = callback; return; } + // + // Wait for the connection to be validated. + // while(_state <= StateNotValidated) { try @@ -79,6 +67,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throw _exception; } } + + // + // We start out in holding state. + // + setState(StateHolding); } } catch(Ice.LocalException ex) @@ -192,12 +185,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public synchronized boolean isFinished() { - if(_transceiver != null || _dispatchCount != 0) + if(_state != StateFinished || _dispatchCount != 0) { return false; } - assert(_state == StateClosed); + assert(_state == StateFinished); return true; } @@ -251,7 +244,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Now we must wait until close() has been called on the // transceiver. // - while(_transceiver != null) + while(_state != StateFinished) { try { @@ -296,7 +289,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - assert(_state == StateClosed); + assert(_state == StateFinished); // // Clear the OA. See bug 1673 for the details of why this is necessary. @@ -316,9 +309,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Active connection management for idle connections. // if(_acmTimeout <= 0 || - !_requests.isEmpty() || !_asyncRequests.isEmpty() || - _batchStreamInUse || !_batchStream.isEmpty() || - _sendInProgress || _dispatchCount > 0) + !_requests.isEmpty() || !_asyncRequests.isEmpty() || _dispatchCount > 0 || + _readStream.size() > IceInternal.Protocol.headerSize || !_writeStream.isEmpty() || !_batchStream.isEmpty()) { return; } @@ -786,7 +778,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne notifyAll(); } - if(_state == StateClosed) + if(_state >= StateClosed) { assert(_exception != null); throw _exception; @@ -798,11 +790,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { initiateShutdown(); } - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; - } } catch(LocalException ex) { @@ -821,7 +808,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne notifyAll(); } - if(_state == StateClosed) + if(_state >= StateClosed) { assert(_exception != null); throw _exception; @@ -831,11 +818,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { initiateShutdown(); } - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; - } } catch(LocalException ex) { @@ -897,103 +879,269 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // Operations from SelectorHandler + // Operations from EventHandler // - public java.nio.channels.SelectableChannel - fd() + public void + message(IceInternal.ThreadPoolCurrent current) { - return _transceiver.fd(); - } + StartCallback startCB = null; + java.util.List<OutgoingMessage> sentCBs = null; + MessageInfo info = null; - public boolean - hasMoreData() - { - return _hasMoreData.value; - } + synchronized(this) + { + if(_state >= StateClosed) + { + return; + } - // - // Operations from EventHandler - // + try + { + unscheduleTimeout(current.operation); + if((current.operation & IceInternal.SocketOperation.Write) != 0 && !_writeStream.isEmpty()) + { + if(!_transceiver.write(_writeStream.getBuffer())) + { + assert(!_writeStream.isEmpty()); + scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); + return; + } + assert(!_writeStream.getBuffer().b.hasRemaining()); + } + if((current.operation & IceInternal.SocketOperation.Read) != 0 && !_readStream.isEmpty()) + { + if(_readStream.size() == IceInternal.Protocol.headerSize) // Read header. + { + if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData)) + { + return; + } + assert(!_readStream.getBuffer().b.hasRemaining()); - public boolean - datagram() - { - return _endpoint.datagram(); // No mutex protection necessary, _endpoint is immutable. - } + int pos = _readStream.pos(); + if(pos < IceInternal.Protocol.headerSize) + { + // + // This situation is possible for small UDP packets. + // + throw new Ice.IllegalMessageSizeException(); + } - public boolean - readable() - { - return true; - } + _readStream.pos(0); + byte[] m = new byte[4]; + m[0] = _readStream.readByte(); + m[1] = _readStream.readByte(); + m[2] = _readStream.readByte(); + m[3] = _readStream.readByte(); + if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] + || m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) + { + Ice.BadMagicException ex = new Ice.BadMagicException(); + ex.badMagic = m; + throw ex; + } - public boolean - read(IceInternal.BasicStream stream) - { - assert(_transceiver != null); - return _transceiver.read(stream.getBuffer(), _hasMoreData); + byte pMajor = _readStream.readByte(); + byte pMinor = _readStream.readByte(); + if(pMajor != IceInternal.Protocol.protocolMajor || pMinor > IceInternal.Protocol.protocolMinor) + { + Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException(); + e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; + e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; + e.major = IceInternal.Protocol.protocolMajor; + e.minor = IceInternal.Protocol.protocolMinor; + throw e; + } - // - // Updating _acmAbsoluteTimeoutMillis is too expensive here, - // because we would have to acquire a lock just for this - // purpose. Instead, we update _acmAbsoluteTimeoutMillis in - // message(). - // - } + byte eMajor = _readStream.readByte(); + byte eMinor = _readStream.readByte(); + if(eMajor != IceInternal.Protocol.encodingMajor || eMinor > IceInternal.Protocol.encodingMinor) + { + Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException(); + e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; + e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; + e.major = IceInternal.Protocol.encodingMajor; + e.minor = IceInternal.Protocol.encodingMinor; + throw e; + } - public void - message(IceInternal.BasicStream stream, IceInternal.ThreadPool threadPool) - { - MessageInfo info = new MessageInfo(stream); + _readStream.readByte(); // messageType + _readStream.readByte(); // compress + int size = _readStream.readInt(); + if(size < IceInternal.Protocol.headerSize) + { + throw new Ice.IllegalMessageSizeException(); + } + if(size > _instance.messageSizeMax()) + { + IceInternal.Ex.throwMemoryLimitException(size, _instance.messageSizeMax()); + } + if(size > _readStream.size()) + { + _readStream.resize(size, true); + } + _readStream.pos(pos); + } - synchronized(this) - { - // - // We must promote within the synchronization, otherwise - // there could be various race conditions with close - // connection messages and other messages. - // - threadPool.promoteFollower(this); + if(_readStream.pos() != _readStream.size()) + { + if(_endpoint.datagram()) + { + if(_warnUdp) + { + _logger.warning("DatagramLimitException: maximum size of " + _readStream.pos() + + " exceeded"); + } + throw new Ice.DatagramLimitException(); + } + else + { + if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData)) + { + assert(!_readStream.isEmpty()); + scheduleTimeout(IceInternal.SocketOperation.Read, _endpoint.timeout()); + return; + } + assert(!_readStream.getBuffer().b.hasRemaining()); + } + } + } + + if(_state <= StateNotValidated) + { + if(_state == StateNotInitialized && !initialize(current.operation)) + { + return; + } + + if(_state <= StateNotValidated && !validate(current.operation)) + { + return; + } + + _threadPool.unregister(this, current.operation); - if(_state != StateClosed) + // + // We start out in holding state. + // + setState(StateHolding); + startCB = _startCallback; + _startCallback = null; + } + else + { + assert(_state <= StateClosing); + + if((current.operation & IceInternal.SocketOperation.Write) != 0) + { + sentCBs = sendNextMessage(); + } + + if((current.operation & IceInternal.SocketOperation.Read) != 0) + { + info = parseMessage(current.stream); + } + } + } + catch(DatagramLimitException ex) // Expected. { - parseMessage(info); + _readStream.resize(IceInternal.Protocol.headerSize, true); + _readStream.pos(0); + return; } - - // - // parseMessage() can close the connection, so we must check - // for closed state again. - // - if(_state == StateClosed) + catch(SocketException ex) { + setState(StateClosed, ex); return; } + catch(LocalException ex) + { + if(_endpoint.datagram()) + { + if(_warn) + { + String s = "datagram connection exception:\n" + ex + '\n' + _desc; + _logger.warning(s); + } + _readStream.resize(IceInternal.Protocol.headerSize, true); + _readStream.pos(0); + } + else + { + setState(StateClosed, ex); + } + return; + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; + } + + current.ioCompleted(); } // - // Asynchronous replies must be handled outside the thread - // synchronization, so that nested calls are possible. + // Notify the factory that the connection establishment and + // validation has completed. // - if(info.outAsync != null) + if(startCB != null) { - info.outAsync.__finished(info.stream); + startCB.connectionStartCompleted(this); } // - // Method invocation (or multiple invocations for batch messages) - // must be done outside the thread synchronization, so that nested - // calls are possible. + // Notify AMI calls that the message was sent. // - invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter); + if(sentCBs != null) + { + for(OutgoingMessage msg : sentCBs) + { + msg.outAsync.__sent(_instance); + } + } + + if(info != null) + { + // + // Asynchronous replies must be handled outside the thread + // synchronization, so that nested calls are possible. + // + if(info.outAsync != null) + { + info.outAsync.__finished(info.stream); + } + + if(info.invokeNum > 0) + { + // + // Method invocation (or multiple invocations for batch messages) + // must be done outside the thread synchronization, so that nested + // calls are possible. + // + invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, + info.adapter); + } + } } public void - finished(IceInternal.ThreadPool threadPool) + finished(IceInternal.ThreadPoolCurrent current) { synchronized(this) { - assert(threadPool == _threadPool && _state == StateClosed && !_sendInProgress); - threadPool.promoteFollower(null); + assert(_state == StateClosed); + unscheduleTimeout(IceInternal.SocketOperation.Read | IceInternal.SocketOperation.Write); + } + + // + // If there are no callbacks to call, we don't call ioCompleted() since we're not going + // to call code that will potentially block (this avoids promoting a new leader and + // unecessary thread creation, especially if this is called on shutdown). + // + if(_startCallback != null || !_sendStreams.isEmpty() || !_asyncRequests.isEmpty()) + { + current.ioCompleted(); } if(_startCallback != null) @@ -1026,43 +1174,38 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // synchronized(this) { - try - { - _transceiver.close(); - } - finally - { - _transceiver = null; - notifyAll(); - } + setState(StateFinished); } } - public synchronized void - exception(LocalException ex) + public String + toString() { - setState(StateClosed, ex); + return _toString(); } - public synchronized void - invokeException(LocalException ex, int invokeNum) + public java.nio.channels.SelectableChannel + fd() { - // - // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't - // called in case of a fatal exception we decrement _dispatchCount here. - // + return _transceiver.fd(); + } - setState(StateClosed, ex); + public boolean + hasMoreData() + { + return _hasMoreData.value; + } - if(invokeNum > 0) + public synchronized void + timedOut() + { + if(_state <= StateNotValidated) { - assert(_dispatchCount > 0); - _dispatchCount -= invokeNum; - assert(_dispatchCount >= 0); - if(_dispatchCount == 0) - { - notifyAll(); - } + setState(StateClosed, new ConnectTimeoutException()); + } + else if(_state <= StateClosing) + { + setState(StateClosed, new TimeoutException()); } } @@ -1078,10 +1221,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return _endpoint.timeout(); // No mutex protection necessary, _endpoint is immutable. } - public String - toString() + // + // Only used by the SSL plug-in. + // + // The external party has to synchronize the connection, since the + // connection is the object that protects the transceiver. + // + public IceInternal.Transceiver + getTransceiver() { - return _toString(); + return _transceiver; } public String @@ -1090,121 +1239,38 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return _desc; // No mutex lock, _desc is immutable. } - // - // Operations from SocketReadyCallback - // - public IceInternal.SocketStatus - socketReady() - { - StartCallback callback = null; - - synchronized(this) - { - assert(_sendInProgress); - - if(_state == StateClosed) - { - return IceInternal.SocketStatus.Finished; - } - - try - { - // - // First, we check if there's something to send. If that's the case, the connection - // must be active and the only thing to do is send the queued streams. - // - if(!_sendStreams.isEmpty()) - { - if(!send()) - { - return IceInternal.SocketStatus.NeedWrite; - } - assert(_sendStreams.isEmpty()); - } - else - { - if(_state == StateNotInitialized) - { - IceInternal.SocketStatus status = initialize(); - if(status != IceInternal.SocketStatus.Finished) - { - return status; - } - } - - if(_state <= StateNotValidated) - { - IceInternal.SocketStatus status = validate(); - if(status != IceInternal.SocketStatus.Finished) - { - return status; - } - } - - callback = _startCallback; - _startCallback = null; - } - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - return IceInternal.SocketStatus.Finished; - } - - assert(_sendStreams.isEmpty()); - _selectorThread.unregister(_socketReadyCallback); - _sendInProgress = false; - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; - } - } - - if(callback != null) - { - callback.connectionStartCompleted(this); - } - return IceInternal.SocketStatus.Finished; - } - public synchronized void - socketFinished() + exception(LocalException ex) { - assert(_sendInProgress && _state == StateClosed); - _sendInProgress = false; - _threadPool.finish(this); + setState(StateClosed, ex); } public synchronized void - socketTimeout() + invokeException(LocalException ex, int invokeNum) { - if(_state <= StateNotValidated) - { - setState(StateClosed, new ConnectTimeoutException()); - } - else if(_state <= StateClosing) + // + // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't + // called in case of a fatal exception we decrement _dispatchCount here. + // + + setState(StateClosed, ex); + + if(invokeNum > 0) { - setState(StateClosed, new TimeoutException()); + assert(_dispatchCount > 0); + _dispatchCount -= invokeNum; + assert(_dispatchCount >= 0); + if(_dispatchCount == 0) + { + notifyAll(); + } } } - // - // Only used by the SSL plug-in. - // - // The external party has to synchronize the connection, since the - // connection is the object that protects the transceiver. - // - public IceInternal.Transceiver - getTransceiver() - { - return _transceiver; - } - public ConnectionI(IceInternal.Instance instance, IceInternal.Transceiver transceiver, IceInternal.EndpointI endpoint, ObjectAdapter adapter) { - super(instance); - + _instance = instance; final Ice.InitializationData initData = instance.initializationData(); _transceiver = transceiver; _desc = transceiver.toString(); @@ -1213,7 +1279,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _adapter = adapter; _logger = initData.logger; // Cached for better performance. _traceLevels = instance.traceLevels(); // Cached for better performance. - _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; + _timer = instance.timer(); + _writeTimeout = new TimeoutCallback(); + _writeTimeoutScheduled = false; + _readTimeout = new TimeoutCallback(); + _readTimeoutScheduled = false; + _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0; + _warnUdp = instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; _cacheBuffers = initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 1) == 1; _acmAbsoluteTimeoutMillis = 0; _nextRequestId = 1; @@ -1223,7 +1295,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchRequestNum = 0; _batchRequestCompress = false; _batchMarker = 0; - _sendInProgress = false; + _readStream = new IceInternal.BasicStream(instance); + _writeStream = new IceInternal.BasicStream(instance); _dispatchCount = 0; _state = StateNotInitialized; _stateTime = IceInternal.Time.currentMonotonicTimeMillis(); @@ -1274,11 +1347,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { _threadPool = _instance.clientThreadPool(); } - - _selectorThread = _instance.selectorThread(); - - _overrideCompress = _instance.defaultsAndOverrides().overrideCompress; - _overrideCompressValue = _instance.defaultsAndOverrides().overrideCompressValue; + _threadPool.initialize(this); } catch(Ice.LocalException ex) { @@ -1297,8 +1366,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throws Throwable { IceUtilInternal.Assert.FinalizerAssert(_startCallback == null); - IceUtilInternal.Assert.FinalizerAssert(_state == StateClosed); - IceUtilInternal.Assert.FinalizerAssert(_transceiver == null); + IceUtilInternal.Assert.FinalizerAssert(_state == StateFinished); IceUtilInternal.Assert.FinalizerAssert(_dispatchCount == 0); IceUtilInternal.Assert.FinalizerAssert(_sendStreams.isEmpty()); IceUtilInternal.Assert.FinalizerAssert(_requests.isEmpty()); @@ -1313,6 +1381,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private static final int StateHolding = 3; private static final int StateClosing = 4; private static final int StateClosed = 5; + private static final int StateFinished = 6; private void setState(int state, LocalException ex) @@ -1321,7 +1390,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // If setState() is called with an exception, then only closed // and closing states are permissible. // - assert(state == StateClosing || state == StateClosed); + assert(state >= StateClosing); if(_state == state) // Don't switch twice. { @@ -1388,8 +1457,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return; } - switch(state) + try { + switch(state) + { case StateNotInitialized: { assert(false); @@ -1416,7 +1487,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { return; } - _threadPool._register(this); + _threadPool.register(this, IceInternal.SocketOperation.Read); break; } @@ -1430,7 +1501,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { return; } - _threadPool.unregister(this); + if(_state == StateActive) + { + _threadPool.unregister(this, IceInternal.SocketOperation.Read); + } break; } @@ -1439,34 +1513,44 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // Can't change back from closed. // - if(_state == StateClosed) + if(_state >= StateClosed) { return; } - _threadPool._register(this); + if(_state == StateHolding) + { + // We need to continue to read in closing state. + _threadPool.register(this, IceInternal.SocketOperation.Read); + } break; } case StateClosed: { - if(_sendInProgress) + if(_state == StateFinished) { - // - // Unregister with both the pool and the selector thread. We unregister with - // the pool to ensure that it stops reading on the socket (otherwise, if the - // socket is closed the thread pool would spin always reading 0 from the FD). - // The selector thread will register again the FD with the pool once it's - // done. - // - _selectorThread.finish(_socketReadyCallback); - _threadPool.unregister(this); - } - else - { - _threadPool.finish(this); + return; } + _threadPool.finish(this); + break; + } + + case StateFinished: + { + assert(_state == StateClosed); + _transceiver.close(); break; } + } + } + catch(Ice.LocalException ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "unexpected connection exception:\n " + _desc + "\n" + sw.toString(); + _instance.initializationData().logger.error(s); } // @@ -1542,13 +1626,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private IceInternal.SocketStatus - initialize() + private boolean + initialize(int operation) { - IceInternal.SocketStatus status = _transceiver.initialize(); - if(status != IceInternal.SocketStatus.Finished) + int s = _transceiver.initialize(); + if(s != IceInternal.SocketOperation.None) { - return status; + scheduleTimeout(s, connectTimeout()); + _threadPool.update(this, operation, s); + return false; } // @@ -1556,53 +1642,55 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // _desc = _transceiver.toString(); setState(StateNotValidated); - return IceInternal.SocketStatus.Finished; + return true; } - private IceInternal.SocketStatus - validate() + private boolean + validate(int operation) { if(!_endpoint.datagram()) // Datagram connections are always implicitly validated. { if(_adapter != null) // The server side has the active role for connection validation. { - IceInternal.BasicStream os = _stream; - if(os.size() == 0) + if(_writeStream.size() == 0) { - os.writeBlob(IceInternal.Protocol.magic); - os.writeByte(IceInternal.Protocol.protocolMajor); - os.writeByte(IceInternal.Protocol.protocolMinor); - os.writeByte(IceInternal.Protocol.encodingMajor); - os.writeByte(IceInternal.Protocol.encodingMinor); - os.writeByte(IceInternal.Protocol.validateConnectionMsg); - os.writeByte((byte)0); // Compression status (always zero for validate connection). - os.writeInt(IceInternal.Protocol.headerSize); // Message size. - IceInternal.TraceUtil.traceSend(os, _logger, _traceLevels); - os.prepareWrite(); + _writeStream.writeBlob(IceInternal.Protocol.magic); + _writeStream.writeByte(IceInternal.Protocol.protocolMajor); + _writeStream.writeByte(IceInternal.Protocol.protocolMinor); + _writeStream.writeByte(IceInternal.Protocol.encodingMajor); + _writeStream.writeByte(IceInternal.Protocol.encodingMinor); + _writeStream.writeByte(IceInternal.Protocol.validateConnectionMsg); + _writeStream.writeByte((byte)0); // Compression status (always zero for validate connection). + _writeStream.writeInt(IceInternal.Protocol.headerSize); // Message size. + IceInternal.TraceUtil.traceSend(_writeStream, _logger, _traceLevels); + _writeStream.prepareWrite(); } - if(!_transceiver.write(os.getBuffer())) + if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer())) { - return IceInternal.SocketStatus.NeedWrite; + scheduleTimeout(IceInternal.SocketOperation.Write, connectTimeout()); + _threadPool.update(this, operation, IceInternal.SocketOperation.Write); + return false; } } else // The client side has the passive role for connection validation. { - IceInternal.BasicStream is = _stream; - if(is.size() == 0) + if(_readStream.size() == 0) { - is.resize(IceInternal.Protocol.headerSize, true); - is.pos(0); + _readStream.resize(IceInternal.Protocol.headerSize, true); + _readStream.pos(0); } - if(!_transceiver.read(is.getBuffer(), _hasMoreData)) + if(_readStream.pos() != _readStream.size() && !_transceiver.read(_readStream.getBuffer(), _hasMoreData)) { - return IceInternal.SocketStatus.NeedRead; + scheduleTimeout(IceInternal.SocketOperation.Read, connectTimeout()); + _threadPool.update(this, operation, IceInternal.SocketOperation.Read); + return false; } - assert(is.pos() == IceInternal.Protocol.headerSize); - is.pos(0); - byte[] m = is.readBlob(4); + assert(_readStream.pos() == IceInternal.Protocol.headerSize); + _readStream.pos(0); + byte[] m = _readStream.readBlob(4); if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) { @@ -1610,8 +1698,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne ex.badMagic = m; throw ex; } - byte pMajor = is.readByte(); - byte pMinor = is.readByte(); + byte pMajor = _readStream.readByte(); + byte pMinor = _readStream.readByte(); if(pMajor != IceInternal.Protocol.protocolMajor) { UnsupportedProtocolException e = new UnsupportedProtocolException(); @@ -1621,8 +1709,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne e.minor = IceInternal.Protocol.protocolMinor; throw e; } - byte eMajor = is.readByte(); - byte eMinor = is.readByte(); + byte eMajor = _readStream.readByte(); + byte eMinor = _readStream.readByte(); if(eMajor != IceInternal.Protocol.encodingMajor) { UnsupportedEncodingException e = new UnsupportedEncodingException(); @@ -1632,116 +1720,115 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne e.minor = IceInternal.Protocol.encodingMinor; throw e; } - byte messageType = is.readByte(); + byte messageType = _readStream.readByte(); if(messageType != IceInternal.Protocol.validateConnectionMsg) { throw new ConnectionNotValidatedException(); } - is.readByte(); // Ignore compression status for validate connection. - int size = is.readInt(); + _readStream.readByte(); // Ignore compression status for validate connection. + int size = _readStream.readInt(); if(size != IceInternal.Protocol.headerSize) { throw new IllegalMessageSizeException(); } - IceInternal.TraceUtil.traceRecv(is, _logger, _traceLevels); + IceInternal.TraceUtil.traceRecv(_readStream, _logger, _traceLevels); } } - _stream.reset(); + _writeStream.resize(0, false); + _writeStream.pos(0); - // - // We start out in holding state. - // - setState(StateHolding); - return IceInternal.SocketStatus.Finished; + _readStream.resize(IceInternal.Protocol.headerSize, true); + _readStream.pos(0); + + return true; } - private boolean - send() + private java.util.List<OutgoingMessage> + sendNextMessage() { - assert(_transceiver != null); assert(!_sendStreams.isEmpty()); + assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size()); - boolean flushSentCallbacks = _sentCallbacks.isEmpty(); + java.util.List<OutgoingMessage> callbacks = new java.util.LinkedList<OutgoingMessage>(); try { - while(!_sendStreams.isEmpty()) + while(true) { + // + // Notify the message that it was sent. + // OutgoingMessage message = _sendStreams.getFirst(); - if(!message.prepared) + _writeStream.swap(message.stream); + message.sent(this, true); + if(message.outAsync instanceof Ice.AMISentCallback) { - IceInternal.BasicStream stream = message.stream; - - boolean compress = _overrideCompress ? _overrideCompressValue : message.compress; - message.stream = doCompress(stream, compress); - message.stream.prepareWrite(); - message.prepared = true; - - if(message.outAsync != null) - { - IceInternal.TraceUtil.trace("sending asynchronous request", stream, _logger, _traceLevels); - } - else - { - IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels); - } - + callbacks.add(message); } + _sendStreams.removeFirst(); - if(!_transceiver.write(message.stream.getBuffer())) + // + // If there's nothing left to send, we're done. + // + if(_sendStreams.isEmpty()) { - return false; + break; } + + // + // Otherwise, prepare the next message stream for writing. + // + message = _sendStreams.getFirst(); + assert(!message.prepared); + IceInternal.BasicStream stream = message.stream; - message.sent(this, true); + message.stream = doCompress(stream, message.compress); + message.stream.prepareWrite(); + message.prepared = true; - if(message.outAsync instanceof Ice.AMISentCallback) + if(message.outAsync != null) + { + IceInternal.TraceUtil.trace("sending asynchronous request", stream, _logger, _traceLevels); + } + else { - _sentCallbacks.add(message); + IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels); } + _writeStream.swap(message.stream); - _sendStreams.removeFirst(); + // + // Send the message. + // + if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer())) + { + assert(!_writeStream.isEmpty()); + scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); + return callbacks; + } } } - finally + catch(Ice.LocalException ex) { - if(flushSentCallbacks && !_sentCallbacks.isEmpty()) - { - _threadPool.execute(_flushSentCallbacks); - } + setState(StateClosed, ex); + return callbacks; } - return true; - } - private void - flushSentCallbacks() - { - java.util.List<OutgoingMessage> sentCallbacks; - synchronized(this) - { - assert(_sentCallbacks != null && !_sentCallbacks.isEmpty()); - sentCallbacks = _sentCallbacks; - _sentCallbacks = new java.util.LinkedList<OutgoingMessage>(); - } - for(OutgoingMessage message : sentCallbacks) - { - message.outAsync.__sent(_instance); - } + assert(_writeStream.isEmpty()); + _threadPool.unregister(this, IceInternal.SocketOperation.Write); + return callbacks; } private boolean sendMessage(OutgoingMessage message) { - assert(_state != StateClosed); - if(_sendInProgress) + assert(_state < StateClosed); + if(!_sendStreams.isEmpty()) { message.adopt(); _sendStreams.addLast(message); return false; } - assert(!_sendInProgress); - // // Attempt to send the message without blocking. If the send blocks, we register // the connection with the selector thread or we request the caller to call @@ -1752,8 +1839,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.BasicStream stream = message.stream; - boolean compress = _overrideCompress ? _overrideCompressValue : message.compress; - message.stream = doCompress(stream, compress); + message.stream = doCompress(stream, message.compress); message.stream.prepareWrite(); message.prepared = true; @@ -1775,11 +1861,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } return true; } + message.adopt(); + _writeStream.swap(message.stream); _sendStreams.addLast(message); - _sendInProgress = true; - message.adopt(); - _selectorThread._register(_socketReadyCallback, IceInternal.SocketStatus.NeedWrite, _endpoint.timeout()); + scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); + _threadPool.register(this, IceInternal.SocketOperation.Write); return false; } @@ -1850,15 +1937,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.OutgoingAsync outAsync; } - private void - parseMessage(MessageInfo info) + private MessageInfo + parseMessage(IceInternal.BasicStream stream) { assert(_state > StateNotValidated && _state < StateClosed); - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; - } + MessageInfo info = new MessageInfo(stream); + + _readStream.swap(info.stream); + _readStream.resize(IceInternal.Protocol.headerSize, true); + _readStream.pos(0); + + assert(info.stream.pos() == info.stream.size()); try { @@ -1874,11 +1964,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_compressionSupported) { - IceInternal.BasicStream ustream = info.stream.uncompress(IceInternal.Protocol.headerSize); - if(ustream != info.stream) - { - info.stream = ustream; - } + info.stream = info.stream.uncompress(IceInternal.Protocol.headerSize); } else { @@ -1998,7 +2084,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_warn) { - _logger.warning("udp connection exception:\n" + ex + _desc); + _logger.warning("datagram connection exception:\n" + ex + '\n' + _desc); } } else @@ -2006,6 +2092,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(StateClosed, ex); } } + + return info; } private void @@ -2095,6 +2183,55 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } + private void + scheduleTimeout(int status, int timeout) + { + if(timeout < 0) + { + return; + } + + if((status & IceInternal.SocketOperation.Read) != 0) + { + _timer.schedule(_readTimeout, timeout); + _readTimeoutScheduled = true; + } + if((status & IceInternal.SocketOperation.Write) != 0) + { + _timer.schedule(_writeTimeout, timeout); + _writeTimeoutScheduled = true; + } + } + + private void + unscheduleTimeout(int status) + { + if((status & IceInternal.SocketOperation.Read) != 0 && _readTimeoutScheduled) + { + _timer.cancel(_readTimeout); + _readTimeoutScheduled = false; + } + if((status & IceInternal.SocketOperation.Write) != 0 && _writeTimeoutScheduled) + { + _timer.cancel(_writeTimeout); + _writeTimeoutScheduled = false; + } + } + + private int + connectTimeout() + { + IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideConnectTimeout) + { + return defaultsAndOverrides.overrideConnectTimeoutValue; + } + else + { + return _endpoint.timeout(); + } + } + private void warning(String msg, Exception ex) { @@ -2285,69 +2422,31 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne boolean prepared; } - static class SocketReadyCallback extends IceInternal.SelectorThread.SocketReadyCallback - { - public - SocketReadyCallback(ConnectionI connection) - { - _connection = connection; - } - - public java.nio.channels.SelectableChannel - fd() - { - return _connection.fd(); - } - - public boolean - hasMoreData() - { - return _connection.hasMoreData(); - } - - public IceInternal.SocketStatus - socketReady() - { - return _connection.socketReady(); - } - - public void - socketFinished() - { - _connection.socketFinished(); - } - - public void - runTimerTask() - { - _connection.socketTimeout(); - } - - final private ConnectionI _connection; - }; - - private IceInternal.Transceiver _transceiver; - private Ice.BooleanHolder _hasMoreData = new Ice.BooleanHolder(false); - + private final IceInternal.Instance _instance; + private final IceInternal.Transceiver _transceiver; private String _desc; private final String _type; private final IceInternal.EndpointI _endpoint; - private final SocketReadyCallback _socketReadyCallback = new SocketReadyCallback(this); private ObjectAdapter _adapter; private IceInternal.ServantManager _servantManager; private final Logger _logger; private final IceInternal.TraceLevels _traceLevels; - private final IceInternal.ThreadPool _threadPool; - private final IceInternal.SelectorThread _selectorThread; + + private final IceInternal.Timer _timer; + private final IceInternal.TimerTask _writeTimeout; + private boolean _writeTimeoutScheduled; + private final IceInternal.TimerTask _readTimeout; + private boolean _readTimeoutScheduled; private StartCallback _startCallback = null; + private Ice.BooleanHolder _hasMoreData = new Ice.BooleanHolder(false); private final boolean _warn; - - private final int _acmTimeout; + private final boolean _warnUdp; + private final long _acmTimeout; private long _acmAbsoluteTimeoutMillis; private final int _compressionLevel; @@ -2369,18 +2468,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private int _batchMarker; private java.util.LinkedList<OutgoingMessage> _sendStreams = new java.util.LinkedList<OutgoingMessage>(); - private boolean _sendInProgress; - private java.util.List<OutgoingMessage> _sentCallbacks = new java.util.LinkedList<OutgoingMessage>(); - private IceInternal.ThreadPoolWorkItem _flushSentCallbacks = new IceInternal.ThreadPoolWorkItem() - { - public void - execute(IceInternal.ThreadPool threadPool) - { - threadPool.promoteFollower(null); - ConnectionI.this.flushSentCallbacks(); - }; - }; + private IceInternal.BasicStream _readStream; + private IceInternal.BasicStream _writeStream; private int _dispatchCount; @@ -2395,7 +2485,5 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private static boolean _compressionSupported = IceInternal.BasicStream.compressible(); - private boolean _overrideCompress; - private boolean _overrideCompressValue; private boolean _cacheBuffers; } diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index 62589a5bad7..674e690d27b 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -23,8 +23,6 @@ public final class ObjectAdapterI implements ObjectAdapter public synchronized Communicator getCommunicator() { - checkForDeactivation(); - return _communicator; } @@ -364,7 +362,6 @@ public final class ObjectAdapterI implements ObjectAdapter // _instance = null; _threadPool = null; - _communicator = null; _routerEndpoints = null; _routerInfo = null; _publishedEndpoints = null; @@ -828,7 +825,6 @@ public final class ObjectAdapterI implements ObjectAdapter _deactivated = true; _destroyed = true; _instance = null; - _communicator = null; _incomingConnectionFactories = null; InitializationException ex = new InitializationException(); @@ -995,7 +991,6 @@ public final class ObjectAdapterI implements ObjectAdapter { IceUtilInternal.Assert.FinalizerAssert(_threadPool == null); //IceUtilInternal.Assert.FinalizerAssert(_servantManager == null); // Not cleared, it needs to be immutable. - IceUtilInternal.Assert.FinalizerAssert(_communicator == null); IceUtilInternal.Assert.FinalizerAssert(_incomingConnectionFactories == null); IceUtilInternal.Assert.FinalizerAssert(_directCount == 0); IceUtilInternal.Assert.FinalizerAssert(!_waitForActivate); diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java index ecc43a8918b..77112002911 100644 --- a/java/src/IceInternal/BasicStream.java +++ b/java/src/IceInternal/BasicStream.java @@ -54,36 +54,37 @@ public class BasicStream reset() { _buf.reset(); + clear(); + } + public void + clear() + { if(_readEncapsStack != null) { assert(_readEncapsStack.next == null); _readEncapsStack.next = _readEncapsCache; _readEncapsCache = _readEncapsStack; - _readEncapsStack = null; _readEncapsCache.reset(); + _readEncapsStack = null; } - if(_objectList != null) + if(_writeEncapsStack != null) { - _objectList.clear(); + assert(_writeEncapsStack.next == null); + _writeEncapsStack.next = _writeEncapsCache; + _writeEncapsCache = _writeEncapsStack; + _writeEncapsCache.reset(); + _writeEncapsStack = null; } - _sliceObjects = true; - } - - public void - clear() - { - _readEncapsStack = null; - _writeEncapsStack = null; _seqDataStack = null; - + if(_objectList != null) { _objectList.clear(); } - _objectList = null; + _sliceObjects = true; } @@ -1780,14 +1781,8 @@ public class BasicStream } catch(java.lang.Exception ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - IceUtilInternal.OutputBase out = new IceUtilInternal.OutputBase(pw); - out.setUseTab(false); - out.print("exception raised by ice_postUnmarshal:\n"); - ex.printStackTrace(pw); - pw.flush(); - _instance.initializationData().logger.warning(sw.toString()); + String s = "exception raised by ice_postUnmarshal:\n" + Ex.toString(ex); + _instance.initializationData().logger.warning("exception raised by ice_postUnmarshal:\n"); } } } @@ -1809,14 +1804,8 @@ public class BasicStream } catch(java.lang.Exception ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - IceUtilInternal.OutputBase out = new IceUtilInternal.OutputBase(pw); - out.setUseTab(false); - out.print("exception raised by ice_preMarshal:\n"); - ex.printStackTrace(pw); - pw.flush(); - _instance.initializationData().logger.warning(sw.toString()); + String s = "exception raised by ice_preUnmarshal:\n" + Ex.toString(ex); + _instance.initializationData().logger.warning("exception raised by ice_preUnmarshal:\n"); } v.__write(this); } diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index e5eb47d7957..b7b55502eea 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -297,9 +297,9 @@ public class ConnectRequestHandler _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() { public void - execute(ThreadPool threadPool) + execute(ThreadPoolCurrent current) { - threadPool.promoteFollower(null); + current.ioCompleted(); flushRequestsWithException(ex); }; }); @@ -430,13 +430,13 @@ public class ConnectRequestHandler { request.os.pos(0); os.writeBlob(request.os.readBlob(request.os.size())); - _connection.finishBatchRequest(os, _compress); } catch(Ice.LocalException ex) { _connection.abortBatchRequest(); throw ex; } + _connection.finishBatchRequest(os, _compress); } p.remove(); } @@ -450,9 +450,9 @@ public class ConnectRequestHandler _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() { public void - execute(ThreadPool threadPool) + execute(ThreadPoolCurrent current) { - threadPool.promoteFollower(null); + current.ioCompleted(); flushRequestsWithException(ex); }; }); @@ -467,9 +467,9 @@ public class ConnectRequestHandler _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() { public void - execute(ThreadPool threadPool) + execute(ThreadPoolCurrent current) { - threadPool.promoteFollower(null); + current.ioCompleted(); flushRequestsWithException(ex); }; }); @@ -482,9 +482,9 @@ public class ConnectRequestHandler instance.clientThreadPool().execute(new ThreadPoolWorkItem() { public void - execute(ThreadPool threadPool) + execute(ThreadPoolCurrent current) { - threadPool.promoteFollower(null); + current.ioCompleted(); for(OutgoingAsyncMessageCallback callback : sentCallbacks) { callback.__sent(instance); diff --git a/java/src/IceInternal/ConnectionMonitor.java b/java/src/IceInternal/ConnectionMonitor.java index 654a5ca46d7..927a5ce768f 100644 --- a/java/src/IceInternal/ConnectionMonitor.java +++ b/java/src/IceInternal/ConnectionMonitor.java @@ -86,23 +86,6 @@ public final class ConnectionMonitor implements IceInternal.TimerTask { conn.monitor(now); } - catch(Ice.LocalException ex) - { - synchronized(this) - { - if(_instance == null) - { - return; - } - - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in connection monitor:\n" + sw.toString(); - _instance.initializationData().logger.error(s); - } - } catch(java.lang.Exception ex) { synchronized(this) @@ -111,11 +94,7 @@ public final class ConnectionMonitor implements IceInternal.TimerTask { return; } - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "unknown exception in connection monitor:\n" + sw.toString(); + String s = "exception in connection monitor:\n" + Ex.toString(ex); _instance.initializationData().logger.error(s); } } diff --git a/java/src/IceInternal/EndpointHostResolver.java b/java/src/IceInternal/EndpointHostResolver.java index c763fbca50d..0592b830039 100644 --- a/java/src/IceInternal/EndpointHostResolver.java +++ b/java/src/IceInternal/EndpointHostResolver.java @@ -25,11 +25,7 @@ public class EndpointHostResolver } catch(RuntimeException ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "cannot create thread for endpoint host resolver thread:\n" + sw.toString(); + String s = "cannot create thread for endpoint host resolver thread:\n" + Ex.toString(ex); _instance.initializationData().logger.error(s); throw ex; } @@ -151,38 +147,15 @@ public class EndpointHostResolver public void run() { - if(_instance.initializationData().threadHook != null) - { - _instance.initializationData().threadHook.start(); - } - try { EndpointHostResolver.this.run(); } - catch(Ice.LocalException ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in endpoint host resolver thread " + getName() + ":\n" + sw.toString(); - _instance.initializationData().logger.error(s); - } catch(java.lang.Exception ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "unknown exception in endpoint host resolver thread " + getName() + ":\n" + sw.toString(); + String s = "exception in endpoint host resolver thread " + getName() + ":\n" + Ex.toString(ex); _instance.initializationData().logger.error(s); } - - if(_instance.initializationData().threadHook != null) - { - _instance.initializationData().threadHook.stop(); - } } } diff --git a/java/src/IceInternal/EventHandler.java b/java/src/IceInternal/EventHandler.java index ba7dfd69103..328db4ac1ca 100644 --- a/java/src/IceInternal/EventHandler.java +++ b/java/src/IceInternal/EventHandler.java @@ -9,66 +9,39 @@ package IceInternal; -public abstract class EventHandler extends SelectorHandler +public abstract class EventHandler { // - // Return true if the handler is for a datagram transport, false otherwise. + // Called when there's a message ready to be processed. // - abstract public boolean datagram(); + abstract public void message(ThreadPoolCurrent current); // - // Return true if read() must be called before calling message(). + // Called when the event handler is unregistered. // - abstract public boolean readable(); + abstract public void finished(ThreadPoolCurrent current); // - // Read data via the event handler. May only be called if - // readable() returns true. - // - abstract public boolean read(BasicStream is); - - // - // A complete message has been received. + // Get a textual representation of the event handler. // - abstract public void message(BasicStream stream, ThreadPool threadPool); + abstract public String toString(); // - // Will be called if the event handler is finally - // unregistered. (Calling unregister() does not unregister - // immediately.) + // Get the native information of the handler, this is used by the selector. // - abstract public void finished(ThreadPool threadPool); + abstract public java.nio.channels.SelectableChannel fd(); // - // Propagate an exception to the event handler. + // In Java, it's possible that the transceiver reads more data than what was + // really asked. If this is the case, hasMoreData() returns true and the handler + // read() method should be called again (without doing a select()). This is + // handled by the Selector class (it adds the handler to a separate list of + // handlers if this method returns true.) // - abstract public void exception(Ice.LocalException ex); + abstract public boolean hasMoreData(); - // - // Get a textual representation of the event handler. - // - abstract public String toString(); - - public IceInternal.Instance - instance() - { - return _instance; - } - - protected - EventHandler(Instance instance) - { - _instance = instance; - _stream = new BasicStream(instance); - } - - protected Instance _instance; - - // - // The _stream data member is only for use by the ThreadPool or by the - // connection for validation. - // - protected BasicStream _stream; - boolean _serializing; - boolean _registered; + int _disabled = 0; + int _registered = 0; + int _ready = 0; + java.nio.channels.SelectionKey _key = null; } diff --git a/java/src/IceInternal/Ex.java b/java/src/IceInternal/Ex.java index 07ec004ff12..01f8f9ebdad 100644 --- a/java/src/IceInternal/Ex.java +++ b/java/src/IceInternal/Ex.java @@ -23,4 +23,16 @@ public class Ex throw new Ice.MemoryLimitException("requested " + requested + " bytes, maximum allowed is " + maximum + " bytes (see Ice.MessageSizeMax)"); } + + // + // A small utility to get the strack trace of the exception (which also includes toString()). + // + public static String toString(java.lang.Throwable ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + return sw.toString(); + } } diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index e83805776bf..d13039c1995 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -80,7 +80,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice // First we wait until the factory is destroyed. If we are using // an acceptor, we also wait for it to be closed. // - while(_state != StateClosed || _acceptor != null) + while(_state != StateFinished) { try { @@ -172,170 +172,111 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } // - // Operations from SelectorHandler. - // - - public java.nio.channels.SelectableChannel - fd() - { - assert(_acceptor != null); - return _acceptor.fd(); - } - - public boolean - hasMoreData() - { - assert(_acceptor != null); - return false; - } - - // // Operations from EventHandler. // - public boolean - datagram() - { - return _endpoint.datagram(); - } - - public boolean - readable() - { - return false; - } - - public boolean - read(BasicStream unused) - { - assert(false); // Must not be called. - return false; - } - public void - message(BasicStream unused, ThreadPool threadPool) + message(ThreadPoolCurrent current) { Ice.ConnectionI connection = null; - - try + synchronized(this) { - synchronized(this) + if(_state >= StateClosed) { - if(_state != StateActive) + return; + } + else if(_state == StateHolding) + { + Thread.yield(); + return; + } + + // + // Reap connections for which destruction has completed. + // + java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator(); + while(p.hasNext()) + { + Ice.ConnectionI con = p.next(); + if(con.isFinished()) { - Thread.yield(); - return; + p.remove(); } - - // - // Reap connections for which destruction has completed. - // - java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator(); - while(p.hasNext()) + } + + // + // Now accept a new connection. + // + Transceiver transceiver = null; + try + { + transceiver = _acceptor.accept(); + } + catch(Ice.SocketException ex) + { + if(Network.noMoreFds(ex.getCause())) { - Ice.ConnectionI con = p.next(); - if(con.isFinished()) + try { - p.remove(); + String s = "fatal error: can't accept more connections:\n" + ex.getCause().getMessage(); + s += '\n' + _acceptor.toString(); + _instance.initializationData().logger.error(s); } - } - - // - // Now accept a new connection. - // - Transceiver transceiver = null; - try - { - transceiver = _acceptor.accept(); - } - catch(Ice.SocketException ex) - { - if(Network.noMoreFds(ex.getCause())) + finally { - try - { - String s = "fatal error: can't accept more connections:\n" + ex.getCause().getMessage(); - s += '\n' + _acceptor.toString(); - _instance.initializationData().logger.error(s); - } - finally - { - Runtime.getRuntime().halt(1); - } - } - - // Ignore socket exceptions. - return; - } - catch(Ice.TimeoutException ex) - { - // Ignore timeouts. - return; + Runtime.getRuntime().halt(1); + } } - catch(Ice.LocalException ex) + + // Ignore socket exceptions. + return; + } + catch(Ice.LocalException ex) + { + // Warn about other Ice local exceptions. + if(_warn) { - // Warn about other Ice local exceptions. - if(_warn) - { - warning(ex); - } - return; + warning(ex); } + return; + } - assert(transceiver != null); + assert(transceiver != null); + try + { + connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter); + } + catch(Ice.LocalException ex) + { try { - connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter); + transceiver.close(); } - catch(Ice.LocalException ex) + catch(Ice.LocalException exc) { - try - { - transceiver.close(); - } - catch(Ice.LocalException exc) - { - // Ignore - } - - if(_warn) - { - warning(ex); - } - return; + // Ignore } - _connections.add(connection); + if(_warn) + { + warning(ex); + } + return; } - } - finally - { - // - // This makes sure that we promote a follower before we leave the scope of the mutex - // above, but after we call accept() (if we call it). - // - threadPool.promoteFollower(null); + + _connections.add(connection); } + assert(connection != null); connection.start(this); } public synchronized void - finished(ThreadPool threadPool) + finished(ThreadPoolCurrent current) { - threadPool.promoteFollower(null); - assert(threadPool == ((Ice.ObjectAdapterI)_adapter).getThreadPool() && _state == StateClosed); - - _acceptor.close(); - _acceptor = null; - notifyAll(); - } - - public void - exception(Ice.LocalException ex) - { - assert(false); // Must not be called. + assert(_state == StateClosed); + setState(StateFinished); } public synchronized String @@ -350,6 +291,20 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice return _acceptor.toString(); } + public java.nio.channels.SelectableChannel + fd() + { + assert(_acceptor != null); + return _acceptor.fd(); + } + + public boolean + hasMoreData() + { + assert(_acceptor != null); + return false; + } + // // Operations from ConnectionI.StartCallback // @@ -369,11 +324,11 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice public synchronized void connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex) { - if(_state == StateClosed) + if(_state >= StateClosed) { return; } - + if(_warn) { warning(ex); @@ -393,7 +348,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice public IncomingConnectionFactory(Instance instance, EndpointI endpoint, Ice.ObjectAdapter adapter, String adapterName) { - super(instance); + _instance = instance; _endpoint = endpoint; _adapter = adapter; _warn = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; @@ -448,6 +403,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice _endpoint = h.value; assert(_acceptor != null); _acceptor.listen(); + ((Ice.ObjectAdapterI)_adapter).getThreadPool().initialize(this); } } catch(java.lang.Exception ex) @@ -492,8 +448,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice finalize() throws Throwable { - IceUtilInternal.Assert.FinalizerAssert(_state == StateClosed); - IceUtilInternal.Assert.FinalizerAssert(_acceptor == null); + IceUtilInternal.Assert.FinalizerAssert(_state == StateFinished); IceUtilInternal.Assert.FinalizerAssert(_connections == null); super.finalize(); @@ -502,6 +457,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice private static final int StateActive = 0; private static final int StateHolding = 1; private static final int StateClosed = 2; + private static final int StateFinished = 3; private void setState(int state) @@ -521,7 +477,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } if(_acceptor != null) { - ((Ice.ObjectAdapterI)_adapter).getThreadPool()._register(this); + ((Ice.ObjectAdapterI)_adapter).getThreadPool().register(this, SocketOperation.Read); } java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator(); @@ -541,7 +497,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } if(_acceptor != null) { - ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(this); + ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(this, SocketOperation.Read); } java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator(); @@ -559,6 +515,10 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice { ((Ice.ObjectAdapterI)_adapter).getThreadPool().finish(this); } + else + { + state = StateFinished; + } java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator(); while(p.hasNext()) @@ -568,6 +528,16 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } break; } + + case StateFinished: + { + assert(_state == StateClosed); + if(_acceptor != null) + { + _acceptor.close(); + } + break; + } } _state = state; @@ -577,14 +547,12 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice private void warning(Ice.LocalException ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "connection exception:\n" + sw.toString() + '\n' + _acceptor.toString(); + String s = "connection exception:\n" + Ex.toString(ex) + '\n' + _acceptor.toString(); _instance.initializationData().logger.warning(s); } + private final IceInternal.Instance _instance; + private Acceptor _acceptor; private final Transceiver _transceiver; private EndpointI _endpoint; diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index ab73e298d64..54006b95dd2 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -175,18 +175,6 @@ public final class Instance return _serverThreadPool; } - public synchronized SelectorThread - selectorThread() - { - if(_state == StateDestroyed) - { - throw new Ice.CommunicatorDestroyedException(); - } - - assert(_selectorThread != null); - return _selectorThread; - } - public synchronized EndpointHostResolver endpointHostResolver() { @@ -716,11 +704,7 @@ public final class Instance } catch(RuntimeException ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "cannot create thread for endpoint host resolver:\n" + sw.toString(); + String s = "cannot create thread for endpoint host resolver:\n" + Ex.toString(ex); _initData.logger.error(s); throw ex; } @@ -735,19 +719,13 @@ public final class Instance } catch(RuntimeException ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "cannot create thread for timer:\n" + sw.toString(); + String s = "cannot create thread for timer:\n" + Ex.toString(ex); _initData.logger.error(s); throw ex; } _clientThreadPool = new ThreadPool(this, "Ice.ThreadPool.Client", 0); - _selectorThread = new SelectorThread(this); - // // Add Process and Properties facets // @@ -781,7 +759,6 @@ public final class Instance IceUtilInternal.Assert.FinalizerAssert(_objectAdapterFactory == null); IceUtilInternal.Assert.FinalizerAssert(_clientThreadPool == null); IceUtilInternal.Assert.FinalizerAssert(_serverThreadPool == null); - IceUtilInternal.Assert.FinalizerAssert(_selectorThread == null); IceUtilInternal.Assert.FinalizerAssert(_endpointHostResolver == null); IceUtilInternal.Assert.FinalizerAssert(_timer == null); IceUtilInternal.Assert.FinalizerAssert(_routerManager == null); @@ -922,7 +899,6 @@ public final class Instance ThreadPool serverThreadPool = null; ThreadPool clientThreadPool = null; - SelectorThread selectorThread = null; EndpointHostResolver endpointHostResolver = null; synchronized(this) @@ -951,13 +927,6 @@ public final class Instance _clientThreadPool = null; } - if(_selectorThread != null) - { - _selectorThread.destroy(); - selectorThread = _selectorThread; - _selectorThread = null; - } - if(_endpointHostResolver != null) { _endpointHostResolver.destroy(); @@ -1027,10 +996,6 @@ public final class Instance { serverThreadPool.joinWithAllThreads(); } - if(selectorThread != null) - { - selectorThread.joinWithThread(); - } if(endpointHostResolver != null) { endpointHostResolver.joinWithThread(); @@ -1105,7 +1070,6 @@ public final class Instance private int _protocolSupport; private ThreadPool _clientThreadPool; private ThreadPool _serverThreadPool; - private SelectorThread _selectorThread; private EndpointHostResolver _endpointHostResolver; private RetryQueue _retryQueue; private Timer _timer; diff --git a/java/src/IceInternal/LocalExceptionWrapper.java b/java/src/IceInternal/LocalExceptionWrapper.java index 11529b52a6f..2a135f7eea7 100644 --- a/java/src/IceInternal/LocalExceptionWrapper.java +++ b/java/src/IceInternal/LocalExceptionWrapper.java @@ -62,11 +62,7 @@ public class LocalExceptionWrapper extends Exception } throw new LocalExceptionWrapper(new Ice.UnknownLocalException(((Ice.LocalException)ex).ice_name()), false); } - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - throw new LocalExceptionWrapper(new Ice.UnknownException(sw.toString()), false); + throw new LocalExceptionWrapper(new Ice.UnknownException(Ex.toString(ex)), false); } private Ice.LocalException _ex; diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java index d47147f6e35..3f875de3336 100644 --- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java +++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java @@ -105,9 +105,9 @@ abstract public class OutgoingAsyncMessageCallback __os.instance().clientThreadPool().execute(new ThreadPoolWorkItem() { public void - execute(ThreadPool threadPool) + execute(ThreadPoolCurrent current) { - threadPool.promoteFollower(null); + current.ioCompleted(); __exception(ex); } }); @@ -149,14 +149,8 @@ abstract public class OutgoingAsyncMessageCallback { if(instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - IceUtilInternal.OutputBase out = new IceUtilInternal.OutputBase(pw); - out.setUseTab(false); - out.print("exception raised by AMI callback:\n"); - ex.printStackTrace(pw); - pw.flush(); - instance.initializationData().logger.warning(sw.toString()); + String s = "exception raised by AMI callback:\n" + Ex.toString(ex); + instance.initializationData().logger.warning(s); } } diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 6ce51eac569..02eabeb4e81 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -669,6 +669,7 @@ public final class OutgoingConnectionFactory // is necessary to support the interruption of the connection initialization and validation // in case the communicator is destroyed. // + Ice.ConnectionI connection = null; try { if(_destroyed) @@ -676,23 +677,7 @@ public final class OutgoingConnectionFactory throw new Ice.CommunicatorDestroyedException(); } - Ice.ConnectionI connection = new Ice.ConnectionI(_instance, transceiver, ci.endpoint.compress(false),null); - - java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci); - if(connectionList == null) - { - connectionList = new java.util.LinkedList<Ice.ConnectionI>(); - _connections.put(ci, connectionList); - } - connectionList.add(connection); - connectionList = _connectionsByEndpoint.get(ci.endpoint); - if(connectionList == null) - { - connectionList = new java.util.LinkedList<Ice.ConnectionI>(); - _connectionsByEndpoint.put(ci.endpoint, connectionList); - } - connectionList.add(connection); - return connection; + connection = new Ice.ConnectionI(_instance, transceiver, ci.endpoint.compress(false),null); } catch(Ice.LocalException ex) { @@ -706,6 +691,22 @@ public final class OutgoingConnectionFactory } throw ex; } + + java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci); + if(connectionList == null) + { + connectionList = new java.util.LinkedList<Ice.ConnectionI>(); + _connections.put(ci, connectionList); + } + connectionList.add(connection); + connectionList = _connectionsByEndpoint.get(ci.endpoint); + if(connectionList == null) + { + connectionList = new java.util.LinkedList<Ice.ConnectionI>(); + _connectionsByEndpoint.put(ci.endpoint, connectionList); + } + connectionList.add(connection); + return connection; } private void diff --git a/java/src/IceInternal/PropertyNames.java b/java/src/IceInternal/PropertyNames.java index 15e20c6b9a5..62ecfb4d287 100644 --- a/java/src/IceInternal/PropertyNames.java +++ b/java/src/IceInternal/PropertyNames.java @@ -8,7 +8,7 @@ // ********************************************************************** // -// Generated by makeprops.py from file ../config/PropertyNames.xml, Wed Jul 29 10:07:20 2009 +// Generated by makeprops.py from file ./config/PropertyNames.xml, Tue Aug 11 09:12:32 2009 // IMPORTANT: Do not edit this file -- any edits made here will be lost! @@ -33,6 +33,7 @@ public final class PropertyNames new Property("Ice\\.Admin\\.ThreadPool\\.SizeWarn", false, null), new Property("Ice\\.Admin\\.ThreadPool\\.StackSize", false, null), new Property("Ice\\.Admin\\.ThreadPool\\.Serialize", false, null), + new Property("Ice\\.Admin\\.ThreadPool\\.ThreadIdleTime", false, null), new Property("Ice\\.Admin\\.ThreadPool\\.ThreadPriority", false, null), new Property("Ice\\.Admin\\.DelayCreation", false, null), new Property("Ice\\.Admin\\.Facets", false, null), @@ -100,12 +101,14 @@ public final class PropertyNames new Property("Ice\\.ThreadPool\\.Client\\.SizeWarn", false, null), new Property("Ice\\.ThreadPool\\.Client\\.StackSize", false, null), new Property("Ice\\.ThreadPool\\.Client\\.Serialize", false, null), + new Property("Ice\\.ThreadPool\\.Client\\.ThreadIdleTime", false, null), new Property("Ice\\.ThreadPool\\.Client\\.ThreadPriority", false, null), new Property("Ice\\.ThreadPool\\.Server\\.Size", false, null), new Property("Ice\\.ThreadPool\\.Server\\.SizeMax", false, null), new Property("Ice\\.ThreadPool\\.Server\\.SizeWarn", false, null), new Property("Ice\\.ThreadPool\\.Server\\.StackSize", false, null), new Property("Ice\\.ThreadPool\\.Server\\.Serialize", false, null), + new Property("Ice\\.ThreadPool\\.Server\\.ThreadIdleTime", false, null), new Property("Ice\\.ThreadPool\\.Server\\.ThreadPriority", false, null), new Property("Ice\\.ThreadPriority", false, null), new Property("Ice\\.Trace\\.GC", false, null), @@ -153,6 +156,7 @@ public final class PropertyNames new Property("IceBox\\.ServiceManager\\.ThreadPool\\.SizeWarn", false, null), new Property("IceBox\\.ServiceManager\\.ThreadPool\\.StackSize", false, null), new Property("IceBox\\.ServiceManager\\.ThreadPool\\.Serialize", false, null), + new Property("IceBox\\.ServiceManager\\.ThreadPool\\.ThreadIdleTime", false, null), new Property("IceBox\\.ServiceManager\\.ThreadPool\\.ThreadPriority", false, null), new Property("IceBox\\.Trace\\.ServiceObserver", false, null), new Property("IceBox\\.UseSharedCommunicator\\.[^\\s]+", false, null), @@ -200,6 +204,7 @@ public final class PropertyNames new Property("IceGrid\\.Node\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Node\\.ThreadPool\\.StackSize", false, null), new Property("IceGrid\\.Node\\.ThreadPool\\.Serialize", false, null), + new Property("IceGrid\\.Node\\.ThreadPool\\.ThreadIdleTime", false, null), new Property("IceGrid\\.Node\\.ThreadPool\\.ThreadPriority", false, null), new Property("IceGrid\\.Node\\.AllowRunningServersAsRoot", false, null), new Property("IceGrid\\.Node\\.AllowEndpointsOverride", false, null), @@ -251,6 +256,7 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.StackSize", false, null), new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.Serialize", false, null), + new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.ThreadIdleTime", false, null), new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.ThreadPriority", false, null), new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.EndpointSelection", false, null), new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.ConnectionCached", false, null), @@ -274,6 +280,7 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.StackSize", false, null), new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.Serialize", false, null), + new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.ThreadIdleTime", false, null), new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.ThreadPriority", false, null), new Property("IceGrid\\.Registry\\.CryptPasswords", false, null), new Property("IceGrid\\.Registry\\.Data", false, null), @@ -292,6 +299,7 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.StackSize", false, null), new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.Serialize", false, null), + new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.ThreadIdleTime", false, null), new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.ThreadPriority", false, null), new Property("IceGrid\\.Registry\\.NodeSessionTimeout", false, null), new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.EndpointSelection", false, null), @@ -318,6 +326,7 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.StackSize", false, null), new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.Serialize", false, null), + new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.ThreadIdleTime", false, null), new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.ThreadPriority", false, null), new Property("IceGrid\\.Registry\\.SessionFilters", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.AdapterId", false, null), @@ -333,6 +342,7 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.StackSize", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.Serialize", false, null), + new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.ThreadIdleTime", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.ThreadPriority", false, null), new Property("IceGrid\\.Registry\\.SessionTimeout", false, null), new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.EndpointSelection", false, null), @@ -375,6 +385,7 @@ public final class PropertyNames new Property("IcePatch2\\.ThreadPool\\.SizeWarn", false, null), new Property("IcePatch2\\.ThreadPool\\.StackSize", false, null), new Property("IcePatch2\\.ThreadPool\\.Serialize", false, null), + new Property("IcePatch2\\.ThreadPool\\.ThreadIdleTime", false, null), new Property("IcePatch2\\.ThreadPool\\.ThreadPriority", false, null), new Property("IcePatch2\\.Admin\\.AdapterId", true, null), new Property("IcePatch2\\.Admin\\.Endpoints", true, null), @@ -465,6 +476,7 @@ public final class PropertyNames new Property("Glacier2\\.Client\\.ThreadPool\\.SizeWarn", false, null), new Property("Glacier2\\.Client\\.ThreadPool\\.StackSize", false, null), new Property("Glacier2\\.Client\\.ThreadPool\\.Serialize", false, null), + new Property("Glacier2\\.Client\\.ThreadPool\\.ThreadIdleTime", false, null), new Property("Glacier2\\.Client\\.ThreadPool\\.ThreadPriority", false, null), new Property("Glacier2\\.Client\\.AlwaysBatch", false, null), new Property("Glacier2\\.Client\\.Buffered", false, null), @@ -515,6 +527,7 @@ public final class PropertyNames new Property("Glacier2\\.Server\\.ThreadPool\\.SizeWarn", false, null), new Property("Glacier2\\.Server\\.ThreadPool\\.StackSize", false, null), new Property("Glacier2\\.Server\\.ThreadPool\\.Serialize", false, null), + new Property("Glacier2\\.Server\\.ThreadPool\\.ThreadIdleTime", false, null), new Property("Glacier2\\.Server\\.ThreadPool\\.ThreadPriority", false, null), new Property("Glacier2\\.Server\\.AlwaysBatch", false, null), new Property("Glacier2\\.Server\\.Buffered", false, null), diff --git a/java/src/IceInternal/Selector.java b/java/src/IceInternal/Selector.java index de45ba303fa..9427db70330 100644 --- a/java/src/IceInternal/Selector.java +++ b/java/src/IceInternal/Selector.java @@ -11,21 +11,18 @@ package IceInternal; public final class Selector { + static final class TimeoutException extends Exception + { + } + public - Selector(Instance instance, int timeout) + Selector(Instance instance) { _instance = instance; - _timeout = timeout; - _interruptCount = 0; - Network.SocketPair pair = Network.createPipe(); - _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; - _fdIntrWrite = pair.sink; try { _selector = java.nio.channels.Selector.open(); - pair.source.configureBlocking(false); - _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ); } catch(java.io.IOException ex) { @@ -52,87 +49,203 @@ public final class Selector { } _selector = null; + } - try + public void + initialize(EventHandler handler) + { + updateImpl(handler); + } + + public void + update(EventHandler handler, int remove, int add) + { + int previous = handler._registered; + handler._registered = handler._registered & ~remove; + handler._registered = handler._registered | add; + if(previous == handler._registered) { - _fdIntrWrite.close(); + return; } - catch(java.io.IOException ex) + updateImpl(handler); + + if(handler.hasMoreData() && (handler._disabled & SocketOperation.Read) == 0) { + if((add & SocketOperation.Read) != 0) + { + _pendingHandlers.add(handler); + } + if((remove & SocketOperation.Read) != 0) + { + _pendingHandlers.remove(handler); + } } - _fdIntrWrite = null; + } - try + public void + enable(EventHandler handler, int status) + { + if((handler._disabled & status) == 0) { - _fdIntrRead.close(); + return; } - catch(java.io.IOException ex) + handler._disabled = handler._disabled & ~status; + + if((handler._registered & status) != 0) { + updateImpl(handler); + + if((status & SocketOperation.Read) != 0 && handler.hasMoreData()) + { + // Add back the pending handler if reads are enabled. + _pendingHandlers.add(handler); + } } - _fdIntrRead = null; } public void - add(SelectorHandler handler, SocketStatus status) + disable(EventHandler handler, int status) { - // Note: we can't support noInterrupt for add() because a channel can't be registered again - // with the selector until the previous selection key has been removed from the cancel-key - // set of the selector on the next select() operation. - - handler._pendingStatus = status; - if(_changes.add(handler)) - { - setInterrupt(); + if((handler._disabled & status) != 0) + { + return; + } + handler._disabled = handler._disabled | status; + + if((handler._registered & status) != 0) + { + updateImpl(handler); + + if((status & SocketOperation.Read) != 0 && handler.hasMoreData()) + { + // Remove the pending handler if reads are disabled. + _pendingHandlers.remove(handler); + } } } public void - update(SelectorHandler handler, SocketStatus newStatus) + finish(EventHandler handler) { - // Note: can only be called from the select() thread - assert(handler._key != null); - handler._key.interestOps(convertStatus(handler.fd(), newStatus)); + handler._registered = 0; + + if(handler._key != null) + { + handler._key.cancel(); + handler._key = null; + } + + _changes.remove(handler); + _pendingHandlers.remove(handler); } public void - remove(SelectorHandler handler) + startSelect() { - // Note: we can't support noInterrupt for remove() because a channel can't be registered again - // with the selector until the previous selection key has been removed from the cancel-key - // set of the selector on the next select() operation. + assert(_changes.isEmpty()); - handler._pendingStatus = IceInternal.SocketStatus.Finished; - if(_changes.add(handler)) + // + // Don't set _selecting = true if there are pending handlers, select() won't block + // and will just call selectNow(). + // + if(_pendingHandlers.isEmpty()) { - setInterrupt(); + _selecting = true; } } public void - select() - throws java.io.IOException + finishSelect(java.util.List<EventHandler> handlers, long timeout) { - // - // If there are still interrupts, selected keys or pending handlers to process, - // return immediately. - // - if(_interrupted || !_keys.isEmpty() || !_pendingHandlers.isEmpty()) + _selecting = false; + handlers.clear(); + + if(!_changes.isEmpty()) { + for(EventHandler h : _changes) + { + updateImpl(h); + } + _changes.clear(); + } + else if(_keys.isEmpty() && _pendingHandlers.isEmpty() && timeout <= 0) + { + // + // This is necessary to prevent a busy loop in case of a spurious wake-up which + // sometime occurs in the client thread pool when the communicator is destroyed. + // If there are too many successive spurious wake-ups, we log an error. + // + try + { + Thread.sleep(1); + } + catch(java.lang.InterruptedException ex) + { + } + + if(++_spuriousWakeUp > 100) + { + _instance.initializationData().logger.error("spurious selector wake up"); + } return; } + + _spuriousWakeUp = 0; - // - // There's nothing left to process, we can now select. - // + for(java.nio.channels.SelectionKey key : _keys) + { + EventHandler handler = (EventHandler)key.attachment(); + try + { + handler._ready = fromJavaOps(key.readyOps()); + if(handler.hasMoreData() && _pendingHandlers.remove(handler)) + { + handler._ready |= SocketOperation.Read; + } + handlers.add(handler); + } + catch(java.nio.channels.CancelledKeyException ex) + { + assert(handler._registered == 0); + } + } + _keys.clear(); + + for(EventHandler handler : _pendingHandlers) + { + if(handler.hasMoreData()) + { + handler._ready = SocketOperation.Read; + handlers.add(handler); + } + } + _pendingHandlers.clear(); + } + + public void + select(long timeout) + throws TimeoutException + { while(true) { try { - if(_nextPendingHandlers.isEmpty()) + // + // Only block if _selecting = true, otherwise we call selectNow() to retrieve new + // ready handlers and process handlers from _pendingHandlers. + // + if(_selecting) { - if(_timeout > 0) + if(timeout > 0) { - _selector.select(_timeout * 1000); + long before = IceInternal.Time.currentMonotonicTimeMillis(); + if(_selector.select(timeout * 1000) == 0) + { + if(IceInternal.Time.currentMonotonicTimeMillis() - before >= timeout * 1000) + { + throw new TimeoutException(); + } + } } else { @@ -142,10 +255,6 @@ public final class Selector else { _selector.selectNow(); - - java.util.HashSet<SelectorHandler> tmp = _nextPendingHandlers; - _nextPendingHandlers = _pendingHandlers; - _pendingHandlers = tmp; } } catch(java.nio.channels.CancelledKeyException ex) @@ -166,326 +275,127 @@ public final class Selector continue; } - throw ex; + try + { + String s = "fatal error: selector failed:\n" + ex.getCause().getMessage(); + _instance.initializationData().logger.error(s); + } + finally + { + Runtime.getRuntime().halt(1); + } } break; } } - public SelectorHandler - getNextSelected() + public void + hasMoreData(EventHandler handler) { - assert(_interruptCount == 0); + assert(!_selecting && handler.hasMoreData()); - if(_iter == null && !_keys.isEmpty()) - { - _iter = _keys.iterator(); - } - - while(_iter != null && _iter.hasNext()) - { - java.nio.channels.SelectionKey key = _iter.next(); - _iter.remove(); - SelectorHandler handler = (SelectorHandler)key.attachment(); - if(handler == null) - { - assert(_pendingInterruptRead > 0); - _pendingInterruptRead -= readInterrupt(_pendingInterruptRead); - continue; - } - else if(handler._key == null || !handler._key.isValid()) - { - continue; - } - if(handler.hasMoreData()) - { - assert(_pendingIter == null); - _pendingHandlers.remove(handler); - } - if(!_iter.hasNext()) - { - _iter = null; - } - return handler; - } - - if(_pendingIter == null && !_pendingHandlers.isEmpty()) - { - _pendingIter = _pendingHandlers.iterator(); - } - - while(_pendingIter != null && _pendingIter.hasNext()) + // + // Only add the handler if read is still registered and enabled. + // + if((handler._registered & ~handler._disabled & SocketOperation.Read) != 0) { - SelectorHandler handler = _pendingIter.next(); - _pendingIter.remove(); - if(handler._key == null || !handler._key.isValid() || !handler.hasMoreData()) - { - continue; - } - if(!_pendingIter.hasNext()) - { - _pendingIter = null; - } - return handler; + _pendingHandlers.add(handler); } - - _iter = null; - _pendingIter = null; - return null; } - public void - hasMoreData(SelectorHandler handler) - { - _nextPendingHandlers.add(handler); - } - - public boolean - processInterrupt() + private void + updateImpl(EventHandler handler) { - assert(_changes.size() <= _interruptCount); - - if(!_changes.isEmpty()) + if(_selecting) { - for(SelectorHandler handler : _changes) - { - if(handler._pendingStatus == SocketStatus.Finished) - { - removeImpl(handler); - } - else - { - addImpl(handler, handler._pendingStatus); - } - clearInterrupt(); - } - _changes.clear(); - // - // We call selectNow() to flush the cancelled-key set and ensure handlers can be - // added again once this returns. + // Queue the change since we can't change the selection key interest ops while a select + // operation is in progress (it could block depending on the underlying implementaiton + // of the Java selector). // - try + if(_changes.isEmpty()) { - _selector.selectNow(); + _selector.wakeup(); } - catch(java.io.IOException ex) - { - // Ignore. - } - - // - // Current iterator is invalidated by selectNow(). - // - _iter = null; - _pendingIter = null; + _changes.add(handler); + return; } - - _interrupted = _interruptCount > 0; - return _interruptCount == 0; // No more interrupts to process. - } - public boolean - checkTimeout() - { - if(_interruptCount == 0 && _keys.isEmpty() && _pendingHandlers.isEmpty()) + int ops = toJavaOps(handler, handler._registered & ~handler._disabled); + if(handler._key == null) { - if(_timeout <= 0) + if(handler._registered != 0) { - // - // This is necessary to prevent a busy loop in case of a spurious wake-up which - // sometime occurs in the client thread pool when the communicator is destroyed. - // If there are too many successive spurious wake-ups, we log an error. - // try { - Thread.sleep(1); + handler._key = handler.fd().register(_selector, ops, handler); } - catch(java.lang.InterruptedException ex) + catch(java.nio.channels.ClosedChannelException ex) { + assert(false); } - - if(++_spuriousWakeUp > 100) - { - _instance.initializationData().logger.error("spurious selector wake up"); - } - return false; } - return true; } else { - _spuriousWakeUp = 0; - return false; - } - } - - public boolean - isInterrupted() - { - return _interruptCount > 0; - } - - public void - setInterrupt() - { - if(++_interruptCount == 1) - { - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - buf.put(0, (byte)0); - while(buf.hasRemaining()) - { - try - { - _fdIntrWrite.write(buf); - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } + handler._key.interestOps(ops); } } - public boolean - clearInterrupt() + int + toJavaOps(EventHandler handler, int o) { - if(--_interruptCount == 0) + int op = 0; + if((o & SocketOperation.Read) != 0) { - // - // If the interrupt byte has not been received by the pipe yet, we just increment - // _pendingInterruptRead. It will be read when the _fdIntrRead is ready for read. - // - if(_keys.contains(_fdIntrReadKey)) + if((handler.fd().validOps() & java.nio.channels.SelectionKey.OP_READ) != 0) { - readInterrupt(1); - _keys.remove(_fdIntrReadKey); - _iter = null; - _pendingIter = null; + op |= java.nio.channels.SelectionKey.OP_READ; } else { - ++_pendingInterruptRead; + op |= java.nio.channels.SelectionKey.OP_ACCEPT; } - _interrupted = false; - return false; - } - else - { - return true; } - } - - private int - readInterrupt(int count) - { - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(count); - try + if((o & SocketOperation.Write) != 0) { - buf.rewind(); - int ret = _fdIntrRead.read(buf); - assert(ret > 0); - return ret; + op |= java.nio.channels.SelectionKey.OP_WRITE; } - catch(java.io.IOException ex) + if((o & SocketOperation.Connect) != 0) { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - - private int - convertStatus(java.nio.channels.SelectableChannel fd, SocketStatus status) - { - if(status == SocketStatus.NeedConnect) - { - return java.nio.channels.SelectionKey.OP_CONNECT; - } - else if(status == SocketStatus.NeedRead) - { - if((fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) - { - return java.nio.channels.SelectionKey.OP_READ; - } - else - { - return java.nio.channels.SelectionKey.OP_ACCEPT; - } - } - else - { - assert(status == SocketStatus.NeedWrite); - return java.nio.channels.SelectionKey.OP_WRITE; + op |= java.nio.channels.SelectionKey.OP_CONNECT; } + return op; } - private void - addImpl(SelectorHandler handler, SocketStatus status) + int + fromJavaOps(int o) { - if(handler._key != null) + int op = 0; + if((o & (java.nio.channels.SelectionKey.OP_READ | java.nio.channels.SelectionKey.OP_ACCEPT)) != 0) { - handler._key.interestOps(convertStatus(handler.fd(), status)); + op |= SocketOperation.Read; } - else + if((o & java.nio.channels.SelectionKey.OP_WRITE) != 0) { - try - { - handler._key = handler.fd().register(_selector, convertStatus(handler.fd(), status), handler); - } - catch(java.nio.channels.ClosedChannelException ex) - { - assert(false); - } - assert(!_nextPendingHandlers.contains(handler)); + op |= SocketOperation.Write; } - - if(handler.hasMoreData()) + if((o & java.nio.channels.SelectionKey.OP_CONNECT) != 0) { - _nextPendingHandlers.add(handler); + op |= SocketOperation.Connect; } + return op; } - private void - removeImpl(SelectorHandler handler) - { - _nextPendingHandlers.remove(handler); - - if(handler._key != null) - { - try - { - handler._key.cancel(); - handler._key = null; - } - catch(java.nio.channels.CancelledKeyException ex) - { - assert(false); - } - } - } final private Instance _instance; - final private int _timeout; private java.nio.channels.Selector _selector; - private java.nio.channels.ReadableByteChannel _fdIntrRead; - private java.nio.channels.WritableByteChannel _fdIntrWrite; - private java.nio.channels.SelectionKey _fdIntrReadKey; private java.util.Set<java.nio.channels.SelectionKey> _keys; - private java.util.Iterator<java.nio.channels.SelectionKey> _iter; - private java.util.HashSet<SelectorHandler> _changes = new java.util.HashSet<SelectorHandler>(); - - private boolean _interrupted; + private java.util.HashSet<EventHandler> _changes = new java.util.HashSet<EventHandler>(); + private java.util.HashSet<EventHandler> _pendingHandlers = new java.util.HashSet<EventHandler>(); + private boolean _selecting; private int _spuriousWakeUp; - private int _interruptCount; - private int _pendingInterruptRead; - - private java.util.HashSet<SelectorHandler> _pendingHandlers = new java.util.HashSet<SelectorHandler>(); - private java.util.HashSet<SelectorHandler> _nextPendingHandlers = new java.util.HashSet<SelectorHandler>(); - private java.util.Iterator<SelectorHandler> _pendingIter; -}; +} diff --git a/java/src/IceInternal/SelectorHandler.java b/java/src/IceInternal/SelectorHandler.java deleted file mode 100644 index 39ec56f7929..00000000000 --- a/java/src/IceInternal/SelectorHandler.java +++ /dev/null @@ -1,31 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -abstract class SelectorHandler -{ - abstract public java.nio.channels.SelectableChannel fd(); - - // - // In Java, it's possible that the transceiver reads more data - // than what was really asked. If this is the case, hasMoreData() - // returns true and the handler read() method should be called - // again (without doing a select()). This is handled by the - // Selector class (it adds the handler to a separate list of handlers - // if this method returns true.) - // - abstract public boolean hasMoreData(); - - // - // The _key data member are only for use by the Selector. - // - protected java.nio.channels.SelectionKey _key; - protected SocketStatus _pendingStatus; -}; diff --git a/java/src/IceInternal/SelectorThread.java b/java/src/IceInternal/SelectorThread.java deleted file mode 100644 index 8dfab6449c7..00000000000 --- a/java/src/IceInternal/SelectorThread.java +++ /dev/null @@ -1,307 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -public class SelectorThread -{ - static public abstract class SocketReadyCallback extends SelectorHandler implements TimerTask - { - abstract public SocketStatus socketReady(); - abstract public void socketFinished(); - - // - // The selector thread doesn't unregister the callback when sockectTimeout is called; socketTimeout - // must unregister the callback either explicitly with unregister() or by shutting down the socket - // (if necessary). - // - //abstract void socketTimeout(); - - protected int _timeout; - protected SocketStatus _status; - protected SocketStatus _previousStatus; - } - - SelectorThread(Instance instance) - { - _instance = instance; - _destroyed = false; - _selector = new Selector(instance, 0); - - try - { - _thread = new HelperThread(); - if(_instance.initializationData().properties.getProperty("Ice.ThreadPriority") != "") - { - _thread.setPriority(Util.getThreadPriorityProperty(_instance.initializationData().properties, "Ice")); - } - _thread.start(); - } - catch(RuntimeException ex) - { - _selector.destroy(); - _selector = null; - - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "cannot create thread for selector thread:\n" + sw.toString(); - _instance.initializationData().logger.error(s); - throw ex; - } - - _timer = _instance.timer(); - } - - protected synchronized void - finalize() - throws Throwable - { - IceUtilInternal.Assert.FinalizerAssert(_destroyed); - } - - public synchronized void - destroy() - { - assert(!_destroyed); - _destroyed = true; - _selector.setInterrupt(); - } - - public synchronized void - _register(SocketReadyCallback cb, SocketStatus status, int timeout) - { - assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories. - assert(status != SocketStatus.Finished); - - cb._timeout = timeout; - cb._status = status; - if(cb._timeout >= 0) - { - _timer.schedule(cb, cb._timeout); - } - - _selector.add(cb, status); - } - - public synchronized void - unregister(SocketReadyCallback cb) - { - // Note: unregister should only be called from the socketReady() call-back. - assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories. - assert(cb._status != SocketStatus.Finished); - - _selector.remove(cb); - cb._status = SocketStatus.Finished; - } - - public synchronized void - finish(SocketReadyCallback cb) - { - assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories. - assert(cb._status != SocketStatus.Finished); - - _selector.remove(cb); - cb._status = SocketStatus.Finished; - - _finished.add(cb); - _selector.setInterrupt(); - } - - public void - joinWithThread() - { - if(_thread != null) - { - try - { - _thread.join(); - } - catch(InterruptedException ex) - { - } - } - } - - public void - run() - { - while(true) - { - try - { - _selector.select(); - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - //throw se; - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - se.printStackTrace(pw); - pw.flush(); - String s = "exception in selector thread:\n" + sw.toString(); - _instance.initializationData().logger.error(s); - continue; - } - - java.util.LinkedList<SocketReadyCallback> readyList = new java.util.LinkedList<SocketReadyCallback>(); - boolean finished = false; - - synchronized(this) - { - _selector.checkTimeout(); - - if(_selector.isInterrupted()) - { - if(_selector.processInterrupt()) - { - continue; - } - - // - // There are two possiblities for an interrupt: - // - // 1. The selector thread has been destroyed. - // 2. A callback is being finished - // - - // - // Thread destroyed? - // - if(_destroyed) - { - break; - } - - do - { - SocketReadyCallback cb = _finished.removeFirst(); - cb._previousStatus = SocketStatus.Finished; - readyList.add(cb); - } - while(_selector.clearInterrupt()); // As long as there are interrupts - finished = true; - } - else - { - SocketReadyCallback cb; - while((cb = (SocketReadyCallback)_selector.getNextSelected()) != null) - { - cb._previousStatus = cb._status; - readyList.add(cb); - } - } - } - - for(SocketReadyCallback cb : readyList) - { - SocketStatus status = SocketStatus.Finished; - try - { - if(cb._timeout >= 0) - { - _timer.cancel(cb); - } - - if(finished) - { - cb.socketFinished(); - } - else - { - status = cb.socketReady(); - } - } - catch(java.lang.Exception ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in selector thread " + _thread.getName() + - " while calling socketReady():\n" + sw.toString(); - _instance.initializationData().logger.error(s); - status = SocketStatus.Finished; - } - - if(status != SocketStatus.Finished) - { - if(cb.hasMoreData()) - { - _selector.hasMoreData(cb); - } - - if(status != cb._previousStatus) - { - synchronized(this) - { - // The callback might have been finished concurrently. - if(cb._status != SocketStatus.Finished) - { - _selector.update(cb, status); - cb._status = status; - } - } - } - - if(cb._timeout >= 0) - { - _timer.schedule(cb, cb._timeout); - } - } - } - } - - assert(_destroyed); - - _selector.destroy(); - } - - private Instance _instance; - private boolean _destroyed; - private Selector _selector; - private java.util.LinkedList<SocketReadyCallback> _finished = new java.util.LinkedList<SocketReadyCallback>(); - - private final class HelperThread extends Thread - { - HelperThread() - { - String threadName = _instance.initializationData().properties.getProperty("Ice.ProgramName"); - if(threadName.length() > 0) - { - threadName += "-"; - } - setName(threadName + "Ice.SelectorThread"); - } - - public void - run() - { - try - { - SelectorThread.this.run(); - } - catch(java.lang.Exception ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in selector thread " + getName() + ":\n" + sw.toString(); - _instance.initializationData().logger.error(s); - } - } - } - - private HelperThread _thread; - private Timer _timer; -} diff --git a/java/src/IceInternal/ServantManager.java b/java/src/IceInternal/ServantManager.java index ddee73a6693..c50ee74b45d 100644 --- a/java/src/IceInternal/ServantManager.java +++ b/java/src/IceInternal/ServantManager.java @@ -288,13 +288,9 @@ public final class ServantManager } catch(java.lang.Exception ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); String s = "exception during locator deactivation:\n" + "object adapter: `" + _adapterName + "'\n" + - "locator category: `" + p.getKey() + "'\n" + sw.toString(); - logger.error(s); + "locator category: `" + p.getKey() + "'\n" + Ex.toString(ex); + _instance.initializationData().logger.error(s); } } } diff --git a/java/src/IceInternal/SocketOperation.java b/java/src/IceInternal/SocketOperation.java new file mode 100644 index 00000000000..9a2350e1033 --- /dev/null +++ b/java/src/IceInternal/SocketOperation.java @@ -0,0 +1,20 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +import java.nio.channels.SelectionKey; + +public class SocketOperation +{ + public static final int None = 0; + public static final int Read = SelectionKey.OP_READ; + public static final int Write = SelectionKey.OP_WRITE; + public static final int Connect = SelectionKey.OP_CONNECT; +} diff --git a/java/src/IceInternal/SocketStatus.java b/java/src/IceInternal/SocketStatus.java deleted file mode 100644 index 5635b7958c8..00000000000 --- a/java/src/IceInternal/SocketStatus.java +++ /dev/null @@ -1,39 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -public final class SocketStatus -{ - private static SocketStatus[] _values = new SocketStatus[4]; - - public static final int _Finished = 0; - public static final SocketStatus Finished = new SocketStatus(_Finished); - public static final int _NeedConnect = 1; - public static final SocketStatus NeedConnect = new SocketStatus(_NeedConnect); - public static final int _NeedRead = 2; - public static final SocketStatus NeedRead = new SocketStatus(_NeedRead); - public static final int _NeedWrite = 3; - public static final SocketStatus NeedWrite = new SocketStatus(_NeedWrite); - - public int - value() - { - return _value; - } - - private - SocketStatus(int val) - { - _value = val; - _values[val] = this; - } - - private int _value; -} diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index d322169a719..6d4a2901f5e 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -18,13 +18,13 @@ final class TcpTransceiver implements Transceiver return _fd; } - public SocketStatus + public int initialize() { if(_state == StateNeedConnect) { _state = StateConnectPending; - return SocketStatus.NeedConnect; + return SocketOperation.Connect; } else if(_state <= StateConnectPending) { @@ -51,13 +51,13 @@ final class TcpTransceiver implements Transceiver } } assert(_state == StateConnected); - return SocketStatus.Finished; + return SocketOperation.None; } public void close() { - if(_traceLevels.network >= 1) + if(_state == StateConnected && _traceLevels.network >= 1) { String s = "closing tcp connection\n" + toString(); _logger.trace(_traceLevels.networkCat, s); @@ -85,9 +85,13 @@ final class TcpTransceiver implements Transceiver { final int size = buf.b.limit(); int packetSize = size - buf.b.position(); - if(_maxPacketSize > 0 && packetSize > _maxPacketSize) + + // + // Limit packet size to avoid performance problems on WIN32 + // + if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) { - packetSize = _maxPacketSize; + packetSize = _maxSendPacketSize; buf.b.limit(buf.b.position() + packetSize); } @@ -108,7 +112,7 @@ final class TcpTransceiver implements Transceiver // Writing would block, so we reset the limit (if necessary) and return true to indicate // that more data must be sent. // - if(packetSize == _maxPacketSize) + if(packetSize == _maxSendPacketSize) { buf.b.limit(size); } @@ -117,7 +121,7 @@ final class TcpTransceiver implements Transceiver if(_traceLevels.network >= 3) { - String s = "sent " + ret + " of " + size + " bytes via tcp\n" + toString(); + String s = "sent " + ret + " of " + packetSize + " bytes via tcp\n" + toString(); _logger.trace(_traceLevels.networkCat, s); } @@ -126,13 +130,13 @@ final class TcpTransceiver implements Transceiver _stats.bytesSent(type(), ret); } - if(packetSize == _maxPacketSize) + if(packetSize == _maxSendPacketSize) { assert(buf.b.position() == buf.b.limit()); packetSize = size - buf.b.position(); - if(packetSize > _maxPacketSize) + if(packetSize > _maxSendPacketSize) { - packetSize = _maxPacketSize; + packetSize = _maxSendPacketSize; } buf.b.limit(buf.b.position() + packetSize); } @@ -154,11 +158,7 @@ final class TcpTransceiver implements Transceiver public boolean read(Buffer buf, Ice.BooleanHolder moreData) { - int remaining = 0; - if(_traceLevels.network >= 3) - { - remaining = buf.b.remaining(); - } + int packetSize = buf.b.remaining(); moreData.value = false; while(buf.b.hasRemaining()) @@ -182,7 +182,7 @@ final class TcpTransceiver implements Transceiver { if(_traceLevels.network >= 3) { - String s = "received " + ret + " of " + remaining + " bytes via tcp\n" + toString(); + String s = "received " + ret + " of " + packetSize + " bytes via tcp\n" + toString(); _logger.trace(_traceLevels.networkCat, s); } @@ -191,6 +191,8 @@ final class TcpTransceiver implements Transceiver _stats.bytesReceived(type(), ret); } } + + packetSize = buf.b.remaining(); } catch(java.io.InterruptedIOException ex) { @@ -240,7 +242,7 @@ final class TcpTransceiver implements Transceiver _state = connected ? StateConnected : StateNeedConnect; _desc = Network.fdToString(_fd); - _maxPacketSize = 0; + _maxSendPacketSize = 0; if(System.getProperty("os.name").startsWith("Windows")) { // @@ -248,10 +250,10 @@ final class TcpTransceiver implements Transceiver // poor throughput performances when transfering large amount of // data. See Microsoft KB article KB823764. // - _maxPacketSize = Network.getSendBufferSize(_fd) / 2; - if(_maxPacketSize < 512) + _maxSendPacketSize = Network.getSendBufferSize(_fd) / 2; + if(_maxSendPacketSize < 512) { - _maxPacketSize = 0; + _maxSendPacketSize = 0; } } } @@ -271,7 +273,7 @@ final class TcpTransceiver implements Transceiver private Ice.Stats _stats; private String _desc; private int _state; - private int _maxPacketSize; + private int _maxSendPacketSize; private static final int StateNeedConnect = 0; private static final int StateConnectPending = 1; diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 917c85182de..4494eb9396f 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -11,13 +11,62 @@ package IceInternal; public final class ThreadPool { - private final static boolean TRACE_REGISTRATION = false; - private final static boolean TRACE_INTERRUPT = false; - private final static boolean TRACE_SHUTDOWN = false; - private final static boolean TRACE_SELECT = false; - private final static boolean TRACE_EXCEPTION = false; - private final static boolean TRACE_THREAD = false; - private final static boolean TRACE_STACK_TRACE = false; + final class ShutdownWorkItem implements ThreadPoolWorkItem + { + public void execute(ThreadPoolCurrent current) + { + current.ioCompleted(); + try + { + _instance.objectAdapterFactory().shutdown(); + } + catch(Ice.CommunicatorDestroyedException ex) + { + } + } + } + + static final class FinishedWorkItem implements ThreadPoolWorkItem + { + public + FinishedWorkItem(EventHandler handler) + { + _handler = handler; + } + + public void execute(ThreadPoolCurrent current) + { + _handler.finished(current); + } + + private final EventHandler _handler; + } + + static final class JoinThreadWorkItem implements ThreadPoolWorkItem + { + public + JoinThreadWorkItem(EventHandlerThread thread) + { + _thread = thread; + } + + public void execute(ThreadPoolCurrent current) + { + // No call to ioCompleted, this shouldn't block (and we don't want to cause + // a new thread to be started). + _thread.join(); + } + + private final EventHandlerThread _thread; + } + + // + // Exception raised by the thread pool work queue when the thread pool + // is destroyed. + // + static final class DestroyedException extends RuntimeException + { + } public ThreadPool(Instance instance, String prefix, int timeout) @@ -25,32 +74,34 @@ public final class ThreadPool _instance = instance; _destroyed = false; _prefix = prefix; - _timeout = timeout; - _selector = new Selector(instance, timeout); + _selector = new Selector(instance); _threadIndex = 0; - _running = 0; _inUse = 0; - _load = 1.0; + _inUseIO = 0; _promote = true; _serialize = _instance.initializationData().properties.getPropertyAsInt(_prefix + ".Serialize") > 0; - _warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; + _serverIdleTime = timeout; + + Ice.Properties properties = _instance.initializationData().properties; - String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName"); + String programName = properties.getProperty("Ice.ProgramName"); if(programName.length() > 0) { - _programNamePrefix = programName + "-"; + _threadPrefix = programName + "-" + _prefix; } else { - _programNamePrefix = ""; + _threadPrefix = _prefix; } + int nProcessors = Runtime.getRuntime().availableProcessors(); + // // We use just one thread as the default. This is the fastest // possible setting, still allows one level of nesting, and // doesn't require to make the servants thread safe. // - int size = _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1); + int size = properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1); if(size < 1) { String s = _prefix + ".Size < 1; Size adjusted to 1"; @@ -58,8 +109,11 @@ public final class ThreadPool size = 1; } - int sizeMax = - _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size); + int sizeMax = properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size); + if(sizeMax == -1) + { + sizeMax = nProcessors; + } if(sizeMax < size) { String s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")"; @@ -67,7 +121,7 @@ public final class ThreadPool sizeMax = size; } - int sizeWarn = _instance.initializationData().properties.getPropertyAsInt( _prefix + ".SizeWarn"); + int sizeWarn = properties.getPropertyAsIntWithDefault(_prefix + ".SizeWarn", sizeMax * 80 / 100); if(sizeWarn != 0 && sizeWarn < size) { String s = _prefix + ".SizeWarn < " + _prefix + ".Size; adjusted SizeWarn to Size (" + size + ")"; @@ -81,28 +135,43 @@ public final class ThreadPool sizeWarn = sizeMax; } + int threadIdleTime = properties.getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60); + if(threadIdleTime < 0) + { + String s = _prefix + ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0"; + _instance.initializationData().logger.warning(s); + threadIdleTime = 0; + } + _size = size; _sizeMax = sizeMax; _sizeWarn = sizeWarn; + _sizeIO = Math.min(sizeMax, nProcessors); + _threadIdleTime = threadIdleTime; - int stackSize = _instance.initializationData().properties.getPropertyAsInt( _prefix + ".StackSize"); + int stackSize = properties.getPropertyAsInt( _prefix + ".StackSize"); if(stackSize < 0) { String s = _prefix + ".StackSize < 0; Size adjusted to JRE default"; _instance.initializationData().logger.warning(s); stackSize = 0; } - _stackSize = stackSize; - _hasPriority = _instance.initializationData().properties.getProperty(_prefix + ".ThreadPriority") != ""; - _priority = Util.getThreadPriorityProperty(_instance.initializationData().properties, _prefix); - if(!_hasPriority) + boolean hasPriority = properties.getProperty(_prefix + ".ThreadPriority") != ""; + int priority = properties.getPropertyAsInt(_prefix + ".ThreadPriority"); + if(!hasPriority) { - _hasPriority = _instance.initializationData().properties.getProperty("Ice.ThreadPriority") != ""; - _priority = Util.getThreadPriorityProperty(_instance.initializationData().properties, "Ice"); + hasPriority = properties.getProperty("Ice.ThreadPriority") != ""; + priority = properties.getPropertyAsInt("Ice.ThreadPriority"); } + _hasPriority = hasPriority; + _priority = priority; + + _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector); + _nextHandler = _handlers.iterator(); + if(_instance.traceLevels().threadPool >= 1) { String s = "creating " + _prefix + ": Size = " + _size + ", SizeMax = " + _sizeMax + ", SizeWarn = " + @@ -112,11 +181,9 @@ public final class ThreadPool try { - _threads = new java.util.ArrayList<EventHandlerThread>(); for(int i = 0; i < _size; i++) { - EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" + - _threadIndex++); + EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++); _threads.add(thread); if(_hasPriority) { @@ -126,16 +193,11 @@ public final class ThreadPool { thread.start(java.lang.Thread.NORM_PRIORITY); } - ++_running; } } catch(RuntimeException ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString(); + String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex); _instance.initializationData().logger.error(s); destroy(); @@ -154,158 +216,49 @@ public final class ThreadPool public synchronized void destroy() { - if(TRACE_SHUTDOWN) - { - trace("destroy"); - } - assert(!_destroyed); _destroyed = true; - _selector.setInterrupt(); + _workQueue.destroy(); } public synchronized void - _register(EventHandler handler) + initialize(EventHandler handler) { assert(!_destroyed); + _selector.initialize(handler); + } - if(!handler._registered) - { - if(TRACE_REGISTRATION) - { - trace("adding handler of type " + handler.getClass().getName() + " for channel " + handler.fd()); - } - - if(!handler._serializing) - { - _selector.add(handler, SocketStatus.NeedRead); - } - handler._registered = true; - } + public void + register(EventHandler handler, int op) + { + update(handler, SocketOperation.None, op); } public synchronized void - unregister(EventHandler handler) + update(EventHandler handler, int remove, int add) { assert(!_destroyed); - if(handler._registered) - { - if(TRACE_REGISTRATION) - { - trace("removing handler for channel " + handler.fd()); - } + _selector.update(handler, remove, add); + } - if(!handler._serializing) - { - _selector.remove(handler); - } - handler._registered = false; - } + public void + unregister(EventHandler handler, int op) + { + update(handler, op, SocketOperation.None); } public synchronized void finish(EventHandler handler) { assert(!_destroyed); - - if(TRACE_REGISTRATION) - { - trace("finishing handler for channel " + handler.fd()); - } - - if(handler._registered) - { - if(!handler._serializing) - { - _selector.remove(handler); - } - handler._registered = false; - } - - _finished.add(handler); - _selector.setInterrupt(); - } - - public synchronized void - execute(ThreadPoolWorkItem workItem) - { - if(_destroyed) - { - throw new Ice.CommunicatorDestroyedException(); - } - _workItems.add(workItem); - _selector.setInterrupt(); + _selector.finish(handler); + _workQueue.queue(new FinishedWorkItem(handler)); } public void - promoteFollower(EventHandler handler) + execute(ThreadPoolWorkItem workItem) { - if(_sizeMax > 1) - { - synchronized(this) - { - if(_serialize && handler != null) - { - handler._serializing = true; - if(handler._registered) - { - _selector.remove(handler); - } - } - - assert(!_promote); - _promote = true; - notify(); - - if(!_destroyed) - { - assert(_inUse >= 0); - ++_inUse; - - if(_inUse == _sizeWarn) - { - String s = "thread pool `" + _prefix + "' is running low on threads\n" - + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; - _instance.initializationData().logger.warning(s); - } - - assert(_inUse <= _running); - if(_inUse < _sizeMax && _inUse == _running) - { - if(_instance.traceLevels().threadPool >= 1) - { - String s = "growing " + _prefix + ": Size = " + (_running + 1); - _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); - } - - try - { - EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" + - _threadIndex++); - _threads.add(thread); - if(_hasPriority) - { - thread.start(_priority); - } - else - { - thread.start(java.lang.Thread.NORM_PRIORITY); - } - ++_running; - } - catch(RuntimeException ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString(); - _instance.initializationData().logger.error(s); - } - } - } - } - } + _workQueue.queue(workItem); } public void @@ -319,12 +272,13 @@ public final class ThreadPool // for(EventHandlerThread thread : _threads) { - thread.join(true); + thread.join(); } // // Destroy the selector // + _workQueue.close(); _selector.destroy(); } @@ -334,656 +288,308 @@ public final class ThreadPool return _prefix; } - // - // Each thread supplies a BasicStream, to avoid creating excessive - // garbage (Java only). - // - private boolean - run(BasicStream stream) + private void + run(EventHandlerThread thread) { - if(_sizeMax > 1) + ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this); + boolean select = false; + while(true) { - synchronized(this) + if(current._handler != null) { - while(!_promote) + try { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + current._handler.message(current); + } + catch(DestroyedException ex) + { + return; + } + catch(java.lang.Exception ex) + { + String s = "exception in `" + _prefix + "':\n" + Ex.toString(ex); + s += "\nevent handler: " + current._handler.toString(); + _instance.initializationData().logger.error(s); } - - _promote = false; - } - - if(TRACE_THREAD) - { - trace("thread " + Thread.currentThread() + " has the lock"); - } - } - - while(true) - { - try - { - _selector.select(); - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - //throw se; - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - se.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "':\n" + sw.toString(); - _instance.initializationData().logger.error(s); - continue; } - - EventHandler handler = null; - ThreadPoolWorkItem workItem = null; - boolean finished = false; - boolean shutdown = false; - - synchronized(this) + else if(select) { - if(_selector.checkTimeout()) + try { - assert(_timeout > 0); - shutdown = true; + _selector.select(_serverIdleTime); } - else if(_selector.isInterrupted()) + catch(Selector.TimeoutException ex) { - if(_selector.processInterrupt()) + synchronized(this) { + if(!_destroyed && _inUse == 0) + { + _workQueue.queue(new ShutdownWorkItem()); // Select timed-out. + } continue; } - - // - // There are three possiblities for an interrupt: - // - // 1. The thread pool has been destroyed. - // - // 2. An event handler is being finished. - // - // 3. A work item has been scheduled. - // - - if(!_finished.isEmpty()) + } + } + + synchronized(this) + { + if(current._handler == null) + { + if(select) { - _selector.clearInterrupt(); - handler = _finished.removeFirst(); - finished = true; + _selector.finishSelect(_handlers, _serverIdleTime); + _nextHandler = _handlers.iterator(); + select = false; } - else if(!_workItems.isEmpty()) + else if(!current._leader && followerWait(thread, current)) { - // - // Work items must be executed first even if the thread pool is destroyed. - // - _selector.clearInterrupt(); - workItem = _workItems.removeFirst(); + return; // Wait timed-out. } - else if(_destroyed) + } + else if(_sizeMax > 1) + { + if(!current._ioCompleted) { // - // Don't clear the interrupt if destroyed, so that the other threads exit as well. + // The handler didn't call ioCompleted() so we take care of decreasing + // the IO thread count now. // - return true; + --_inUseIO; + + if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) + { + _selector.hasMoreData(current._handler); + } } else { - assert(false); + // + // If the handler called ioCompleted(), we re-enable the handler in + // case it was disabled and we decrease the number of thread in use. + // + _selector.enable(current._handler, current.operation); + assert(_inUse > 0); + --_inUse; } - } - else - { - handler = (EventHandler)_selector.getNextSelected(); - if(handler == null) + + if(!current._leader && followerWait(thread, current)) { - continue; + return; // Wait timed-out. } } - } - - // - // Now we are outside the thread synchronization. - // - - if(shutdown) - { - // - // Initiate server shutdown. - // - ObjectAdapterFactory factory; - try - { - factory = _instance.objectAdapterFactory(); - } - catch(Ice.CommunicatorDestroyedException e) + else if(!current._ioCompleted && + (current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) { - continue; + _selector.hasMoreData(current._handler); } - promoteFollower(null); - factory.shutdown(); - // - // No "continue", because we want shutdown to be done in - // its own thread from this pool. Therefore we called - // promoteFollower(). + // Get the next ready handler. // - } - else if(workItem != null) - { - try + if(_nextHandler.hasNext()) { - workItem.execute(this); + current._ioCompleted = false; + current._handler = _nextHandler.next(); + current.operation = current._handler._ready; } - catch(Ice.LocalException ex) + else { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "' while calling execute():\n" + sw.toString(); - _instance.initializationData().logger.error(s); + current._handler = null; } - // - // No "continue", because we want execute() to - // be called in its own thread from this - // pool. Note that this means that execute() - // must call promoteFollower(). - // - } - else - { - assert(handler != null); - - if(finished) + if(current._handler == null) { // - // Notify a handler about its removal from the thread pool. + // If there are no more ready handlers and there are still threads busy performing + // IO, we give up leadership and promote another follower (which will perform the + // select() only once all the IOs are completed). Otherwise, if there's no more + // threads peforming IOs, it's time to do another select(). // - try + if(_inUseIO > 0) { - handler.finished(this); + promoteFollower(current); } - catch(Ice.LocalException ex) + else { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "' while calling finished():\n" + - sw.toString() + "\n" + handler.toString(); - _instance.initializationData().logger.error(s); + _selector.startSelect(); + select = true; } - - // - // No "continue", because we want finished() to be - // called in its own thread from this pool. Note - // that this means that finished() must call - // promoteFollower(). - // } - else + else if(_sizeMax > 1) { // - // If the handler is "readable", try to read a - // message. + // Increment the IO thread count and if there's still threads available + // to perform IO and more handlers ready, we promote a follower. // - try - { - if(handler.readable()) - { - try - { - if(!read(handler)) - { - continue; // Can't read without blocking. - } - - if(handler.hasMoreData()) - { - _selector.hasMoreData(handler); - } - } - catch(Ice.TimeoutException ex) - { - assert(false); // This shouldn't occur as we only perform non-blocking reads. - continue; - } - catch(Ice.DatagramLimitException ex) // Expected. - { - handler._stream.resize(0, true); - continue; - } - catch(Ice.SocketException ex) - { - if(TRACE_EXCEPTION) - { - trace("informing handler (" + handler.getClass().getName() + - ") about exception " + ex); - ex.printStackTrace(); - } - - handler.exception(ex); - continue; - } - catch(Ice.LocalException ex) - { - if(handler.datagram()) - { - if(_instance.initializationData().properties.getPropertyAsInt( - "Ice.Warn.Connections") > 0) - { - _instance.initializationData().logger.warning( - "datagram connection exception:\n" + ex + "\n" + handler.toString()); - } - handler._stream.resize(0, true); - } - else - { - if(TRACE_EXCEPTION) - { - trace("informing handler (" + handler.getClass().getName() + - ") about exception " + ex); - ex.printStackTrace(); - } - - handler.exception(ex); - } - continue; - } - - stream.swap(handler._stream); - assert(stream.pos() == stream.size()); - } - - // - // Provide a new message to the handler. - // - try - { - handler.message(stream, this); - } - catch(Ice.LocalException ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "' while calling message():\n" + - sw.toString() + "\n" + handler.toString(); - _instance.initializationData().logger.error(s); - } - - // - // No "continue", because we want message() to - // be called in its own thread from this - // pool. Note that this means that message() - // must call promoteFollower(). - // - } - finally + ++_inUseIO; + if(_nextHandler.hasNext() && _inUseIO < _sizeIO) { - stream.reset(); + promoteFollower(current); } } } + } + } + + void + ioCompleted(ThreadPoolCurrent current) + { + current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called. - if(_sizeMax > 1) + if(_sizeMax > 1) + { + synchronized(this) { - synchronized(this) + --_inUseIO; + + if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) { - if(!_destroyed) - { - if(_serialize && handler != null && handler._serializing) - { - if(handler._registered) - { - _selector.add(handler, SocketStatus.NeedRead); - } - handler._serializing = false; - } + _selector.hasMoreData(current._handler); + } + if(_serialize && !_destroyed) + { + _selector.disable(current._handler, current.operation); + } + + if(current._leader) + { + // + // If this thread is still the leader, it's time to promote a new leader. + // + promoteFollower(current); + } + else if(_promote && (_nextHandler.hasNext() || _inUseIO == 0)) + { + notify(); + } - if(_size < _sizeMax) // Dynamic thread pool + assert(_inUse >= 0); + ++_inUse; + + if(_inUse == _sizeWarn) + { + String s = "thread pool `" + _prefix + "' is running low on threads\n" + + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; + _instance.initializationData().logger.warning(s); + } + + if(!_destroyed) + { + assert(_inUse <= _threads.size()); + if(_inUse < _sizeMax && _inUse == _threads.size()) + { + if(_instance.traceLevels().threadPool >= 1) { - // - // First we reap threads that have been - // destroyed before. - // - int sz = _threads.size(); - assert(_running <= sz); - if(_running < sz) - { - java.util.Iterator<EventHandlerThread> i = _threads.iterator(); - while(i.hasNext()) - { - EventHandlerThread thread = i.next(); - - if(!thread.isAlive()) - { - if(thread.join(false)) - { - i.remove(); - } - } - } - } + String s = "growing " + _prefix + ": Size=" + (_threads.size() + 1); + _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); + } - // - // Now we check if this thread can be destroyed, based - // on a load factor. - // - - // - // The load factor jumps immediately to the number of - // threads that are currently in use, but decays - // exponentially if the number of threads in use is - // smaller than the load factor. This reflects that we - // create threads immediately when they are needed, - // but want the number of threads to slowly decline to - // the configured minimum. - // - double inUse = (double)_inUse; - if(_load < inUse) + try + { + EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++); + _threads.add(thread); + if(_hasPriority) { - _load = inUse; + thread.start(_priority); } else { - final double loadFactor = 0.05; // TODO: Configurable? - final double oneMinusLoadFactor = 1 - loadFactor; - _load = _load * oneMinusLoadFactor + _inUse * loadFactor; - } - - if(_running > _size) - { - int load = (int)(_load + 0.5); - - // - // We add one to the load factor because one - // additional thread is needed for select(). - // - if(load + 1 < _running) - { - if(_instance.traceLevels().threadPool >= 1) - { - String s = "shrinking " + _prefix + ": Size = " + (_running - 1); - _instance.initializationData().logger.trace( - _instance.traceLevels().threadPoolCat, s); - } - - assert(_inUse > 0); - --_inUse; - - assert(_running > 0); - --_running; - - return false; - } + thread.start(java.lang.Thread.NORM_PRIORITY); } } - - assert(_inUse > 0); - --_inUse; - } - - // - // Do not wait to be promoted again to release these objects. - // - handler = null; - workItem = null; - - while(!_promote) - { - try - { - wait(); - } - catch(InterruptedException ex) + catch(RuntimeException ex) { + String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex); + _instance.initializationData().logger.error(s); } } - - _promote = false; - } - - if(TRACE_THREAD) - { - trace("thread " + Thread.currentThread() + " has the lock"); } } } - } - - private boolean - read(EventHandler handler) - { - BasicStream stream = handler._stream; - - if(stream.pos() >= Protocol.headerSize) - { - if(!handler.read(stream)) - { - return false; - } - assert(stream.pos() == stream.size()); - return true; - } - - if(stream.size() == 0) + else if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) { - stream.resize(Protocol.headerSize, true); - stream.pos(0); - } - - if(stream.pos() != stream.size()) - { - if(!handler.read(stream)) + synchronized(this) { - return false; + _selector.hasMoreData(current._handler); } - assert(stream.pos() == stream.size()); - } - - int pos = stream.pos(); - if(pos < Protocol.headerSize) - { - // - // This situation is possible for small UDP packets. - // - throw new Ice.IllegalMessageSizeException(); - } - stream.pos(0); - byte[] m = new byte[4]; - m[0] = stream.readByte(); - m[1] = stream.readByte(); - m[2] = stream.readByte(); - m[3] = stream.readByte(); - if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] - || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3]) - { - Ice.BadMagicException ex = new Ice.BadMagicException(); - ex.badMagic = m; - throw ex; } - - byte pMajor = stream.readByte(); - byte pMinor = stream.readByte(); - if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor) - { - Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException(); - e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; - e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; - e.major = Protocol.protocolMajor; - e.minor = Protocol.protocolMinor; - throw e; - } - - byte eMajor = stream.readByte(); - byte eMinor = stream.readByte(); - if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor) - { - Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException(); - e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; - e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; - e.major = Protocol.encodingMajor; - e.minor = Protocol.encodingMinor; - throw e; - } - - stream.readByte(); // messageType - stream.readByte(); // compress - int size = stream.readInt(); - if(size < Protocol.headerSize) - { - throw new Ice.IllegalMessageSizeException(); - } - if(size > _instance.messageSizeMax()) - { - Ex.throwMemoryLimitException(size, _instance.messageSizeMax()); - } - if(size > stream.size()) - { - stream.resize(size, true); - } - stream.pos(pos); - - if(stream.pos() != stream.size()) + } + + private synchronized void + promoteFollower(ThreadPoolCurrent current) + { + assert(!_promote && current._leader); + _promote = true; + if(_inUseIO < _sizeIO && (_nextHandler.hasNext() || _inUseIO == 0)) { - if(handler.datagram()) - { - if(_warnUdp) - { - _instance.initializationData().logger.warning("DatagramLimitException: maximum size of " - + stream.pos() + " exceeded"); - } - throw new Ice.DatagramLimitException(); - } - else - { - if(!handler.read(stream)) - { - return false; - } - assert(stream.pos() == stream.size()); - } + notify(); } - - return true; + current._leader = false; } -/* - * Commented out because it is unused. - * - private void - selectNonBlocking() + private synchronized boolean + followerWait(EventHandlerThread thread, ThreadPoolCurrent current) { - while(true) + assert(!current._leader); + + // + // It's important to clear the handler before waiting to make sure that + // resources for the handler are released now if it's finished. We also + // clear the per-thread stream. + // + current._handler = null; + current.stream.reset(); + + // + // Wait to be promoted and for all the IO threads to be done. + // + while(!_promote || _inUseIO == _sizeIO || _nextHandler.hasNext() && _inUseIO > 0) { - try + while(true) { - if(TRACE_SELECT) - { - trace("non-blocking select on " + _selector.keys().size() + " keys, thread id = " + - Thread.currentThread()); - } - - _selector.selectNow(); - - if(TRACE_SELECT) + try { - if(_keys.size() > 0) + if(_threadIdleTime > 0) { - trace("after selectNow, there are " + _keys.size() + " selected keys:"); - for(java.nio.channels.SelectionKey key : _keys) + long before = IceInternal.Time.currentMonotonicTimeMillis(); + wait(_threadIdleTime * 1000); + if(IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000) { - trace(" " + keyToString(key)); + if(_instance.traceLevels().threadPool >= 1) + { + String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1); + _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); + } + assert(_threads.size() > 1); // Can only be called by a waiting follower thread. + _threads.remove(thread); + _workQueue.queue(new JoinThreadWorkItem(thread)); + return true; } } - } + else + { + wait(); + } - break; - } - catch(java.io.InterruptedIOException ex) - { - continue; - } - catch(java.io.IOException ex) - { - // - // Pressing Ctrl-C causes select() to raise an - // IOException, which seems like a JDK bug. We trap - // for that special case here and ignore it. - // Hopefully we're not masking something important! - // - if(ex.getMessage().indexOf("Interrupted system call") != -1) + break; + } + catch(InterruptedException ex) { - continue; } - - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - //throw se; - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - se.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "':\n" + sw.toString(); - _instance.initializationData().logger.error(s); - continue; } } - } -*/ - - private void - trace(String msg) - { - System.err.println(_prefix + ": " + msg); - } - - private String - keyToString(java.nio.channels.SelectionKey key) - { - String ops = "["; - if(key.isAcceptable()) - { - ops += " OP_ACCEPT"; - } - if(key.isReadable()) - { - ops += " OP_READ"; - } - if(key.isConnectable()) - { - ops += " OP_CONNECT"; - } - if(key.isWritable()) - { - ops += " OP_WRITE"; - } - ops += " ]"; - return key.channel() + " " + ops; + current._leader = true; // The current thread has become the leader. + _promote = false; + return false; } - private Instance _instance; + private final Instance _instance; + private final ThreadPoolWorkQueue _workQueue; private boolean _destroyed; private final String _prefix; - private final String _programNamePrefix; + private final String _threadPrefix; private final Selector _selector; - private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>(); - private java.util.LinkedList<EventHandler> _finished = new java.util.LinkedList<EventHandler>(); - private int _timeout; private final class EventHandlerThread implements Runnable { @@ -992,28 +598,18 @@ public final class ThreadPool _name = name; } - public boolean - isAlive() - { - return _thread.isAlive(); - } - - public boolean - join(boolean wait) + public void + join() { while(true) { try { _thread.join(); - return true; + break; } catch(InterruptedException ex) { - if(!wait) - { - return false; - } } } } @@ -1031,73 +627,65 @@ public final class ThreadPool { if(_instance.initializationData().threadHook != null) { - _instance.initializationData().threadHook.start(); + try + { + _instance.initializationData().threadHook.start(); + } + catch(java.lang.Exception ex) + { + String s = "thread hook start() method raised an unexpected exception in `"; + s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex); + _instance.initializationData().logger.error(s); + } } - BasicStream stream = new BasicStream(_instance); - - boolean promote; - try { - promote = ThreadPool.this.run(stream); + ThreadPool.this.run(this); } catch(java.lang.Exception ex) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "' thread " + _name + ":\n" + sw.toString(); + String s = "exception in `" + _prefix + "' thread " + _name + ":\n" + Ex.toString(ex); _instance.initializationData().logger.error(s); - promote = true; } - if(promote && _sizeMax > 1) + if(_instance.initializationData().threadHook != null) { - // - // Promote a follower, but w/o modifying _inUse or - // creating new threads. - // - synchronized(ThreadPool.this) + try { - assert(!_promote); - _promote = true; - ThreadPool.this.notify(); + _instance.initializationData().threadHook.stop(); + } + catch(java.lang.Exception ex) + { + String s = "thread hook stop() method raised an unexpected exception in `"; + s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex); + _instance.initializationData().logger.error(s); } - } - - if(TRACE_THREAD) - { - trace("run() terminated"); - } - - if(_instance.initializationData().threadHook != null) - { - _instance.initializationData().threadHook.stop(); } } - private String _name; + final private String _name; private Thread _thread; } private final int _size; // Number of threads that are pre-created. + private final int _sizeIO; // Number of threads that can concurrently perform IO. private final int _sizeMax; // Maximum number of threads. private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. private final boolean _serialize; // True if requests need to be serialized over the connection. - + private final int _priority; + private final boolean _hasPriority; + private final long _serverIdleTime; + private final long _threadIdleTime; private final int _stackSize; - private java.util.List<EventHandlerThread> _threads; // All threads, running or not. + private java.util.List<EventHandlerThread> _threads = new java.util.ArrayList<EventHandlerThread>(); private int _threadIndex; // For assigning thread names. - private int _running; // Number of running threads. private int _inUse; // Number of threads that are currently in use. - private double _load; // Current load in number of threads. + private int _inUseIO; // Number of threads that are currently performing IO. - private boolean _promote; + private java.util.List<EventHandler> _handlers = new java.util.ArrayList<EventHandler>(); + private java.util.Iterator<EventHandler> _nextHandler; - private final boolean _warnUdp; - private int _priority; - private boolean _hasPriority = false; + private boolean _promote; } diff --git a/java/src/IceInternal/ThreadPoolCurrent.java b/java/src/IceInternal/ThreadPoolCurrent.java new file mode 100644 index 00000000000..6e12af421ad --- /dev/null +++ b/java/src/IceInternal/ThreadPoolCurrent.java @@ -0,0 +1,37 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public final class ThreadPoolCurrent +{ + ThreadPoolCurrent(Instance instance, ThreadPool threadPool) + { + operation = SocketOperation.None; + stream = new BasicStream(instance); + + _threadPool = threadPool; + _ioCompleted = false; + _leader = false; + } + + public int operation; + public BasicStream stream; // A per-thread stream to be used by event handlers for optimization. + + public void + ioCompleted() + { + _threadPool.ioCompleted(this); + } + + final ThreadPool _threadPool; + EventHandler _handler; + boolean _ioCompleted; + boolean _leader; +} diff --git a/java/src/IceInternal/ThreadPoolWorkItem.java b/java/src/IceInternal/ThreadPoolWorkItem.java index f45cf1af4bc..3aef961990f 100644 --- a/java/src/IceInternal/ThreadPoolWorkItem.java +++ b/java/src/IceInternal/ThreadPoolWorkItem.java @@ -11,5 +11,5 @@ package IceInternal; public interface ThreadPoolWorkItem { - void execute(ThreadPool threadPool); + void execute(ThreadPoolCurrent current); } diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java new file mode 100644 index 00000000000..3157cd8a6d0 --- /dev/null +++ b/java/src/IceInternal/ThreadPoolWorkQueue.java @@ -0,0 +1,182 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +final class ThreadPoolWorkQueue extends EventHandler +{ + ThreadPoolWorkQueue(ThreadPool threadPool, Instance instance, Selector selector) + { + _threadPool = threadPool; + _instance = instance; + _selector = selector; + _destroyed = false; + + Network.SocketPair pair = Network.createPipe(); + _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; + _fdIntrWrite = pair.sink; + try + { + pair.source.configureBlocking(false); + } + catch(java.io.IOException ex) + { + Ice.SyscallException sys = new Ice.SyscallException(); + sys.initCause(ex); + throw sys; + } + + _selector.initialize(this); + _selector.update(this, SocketOperation.None, SocketOperation.Read); + } + + protected synchronized void + finalize() + throws Throwable + { + IceUtilInternal.Assert.FinalizerAssert(_destroyed); + } + + public synchronized void + close() + { + try + { + _fdIntrWrite.close(); + } + catch(java.io.IOException ex) + { + } + _fdIntrWrite = null; + + try + { + _fdIntrRead.close(); + } + catch(java.io.IOException ex) + { + } + _fdIntrRead = null; + } + + public synchronized + void destroy() + { + assert(!_destroyed); + _destroyed = true; + postMessage(); + } + + public synchronized void + queue(ThreadPoolWorkItem item) + { + if(_destroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + _workItems.add(item); + postMessage(); + } + + public void + message(ThreadPoolCurrent current) + { + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); + try + { + buf.rewind(); + int ret = _fdIntrRead.read(buf); + assert(ret > 0); + } + catch(java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + + ThreadPoolWorkItem workItem = null; + synchronized(this) + { + if(!_workItems.isEmpty()) + { + workItem = _workItems.removeFirst(); + } + else + { + assert(_destroyed); + postMessage(); + } + } + + if(workItem != null) + { + workItem.execute(current); + } + else + { + _threadPool.ioCompleted(current); + throw new ThreadPool.DestroyedException(); + } + } + + public void + finished(ThreadPoolCurrent current) + { + assert(false); + } + + public String + toString() + { + return "work queue"; + } + + public java.nio.channels.SelectableChannel + fd() + { + return (java.nio.channels.SelectableChannel)_fdIntrRead; + } + + public boolean + hasMoreData() + { + return false; + } + + public void + postMessage() + { + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); + buf.put(0, (byte)0); + while(buf.hasRemaining()) + { + try + { + _fdIntrWrite.write(buf); + } + catch(java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + } + } + + private final ThreadPool _threadPool; + private final Instance _instance; + private final Selector _selector; + boolean _destroyed; + + private java.nio.channels.ReadableByteChannel _fdIntrRead; + private java.nio.channels.WritableByteChannel _fdIntrWrite; + + private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>(); +}
\ No newline at end of file diff --git a/java/src/IceInternal/Timer.java b/java/src/IceInternal/Timer.java index c7840bd339a..d907dd827b9 100644 --- a/java/src/IceInternal/Timer.java +++ b/java/src/IceInternal/Timer.java @@ -235,11 +235,7 @@ public final class Timer extends Thread { if(_instance != null) { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "unexpected exception from task run method in timer thread:\n" + sw.toString(); + String s = "unexpected exception from task run method in timer thread:\n" + Ex.toString(ex); _instance.initializationData().logger.error(s); } } diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java index ca3f028dc73..6cecdd4cab5 100644 --- a/java/src/IceInternal/Transceiver.java +++ b/java/src/IceInternal/Transceiver.java @@ -16,15 +16,9 @@ public interface Transceiver // // Initialize the transceiver. // - // Returns the status if the initialize operation. If timeout is != 0, - // the status will always be SocketStatus.Finished. If timeout is 0, - // the operation won't block and will return SocketStatus.NeedRead or - // SocketStatus.NeedWrite if the initialization couldn't be completed - // without blocking. This operation should be called again once the - // socket is ready for reading or writing and until it returns - // SocketStatus.Finished. - // - SocketStatus initialize(); + // Returns the status if the initialize operation. + // + int initialize(); void close(); diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index 1dec5fde575..8a5cd336ad8 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -18,13 +18,13 @@ final class UdpTransceiver implements Transceiver return _fd; } - public SocketStatus + public int initialize() { // // Nothing to do. // - return SocketStatus.Finished; + return SocketOperation.None; } public void @@ -32,7 +32,7 @@ final class UdpTransceiver implements Transceiver { assert(_fd != null); - if(_traceLevels.network >= 1) + if(!_connect && _traceLevels.network >= 1) { String s = "closing udp connection\n" + toString(); _logger.trace(_traceLevels.networkCat, s); diff --git a/java/src/IceSSL/TransceiverI.java b/java/src/IceSSL/TransceiverI.java index e1e7e099cc3..60a434e7719 100644 --- a/java/src/IceSSL/TransceiverI.java +++ b/java/src/IceSSL/TransceiverI.java @@ -22,7 +22,7 @@ final class TransceiverI implements IceInternal.Transceiver return _fd; } - public IceInternal.SocketStatus + public int initialize() { try @@ -30,7 +30,7 @@ final class TransceiverI implements IceInternal.Transceiver if(_state == StateNeedConnect) { _state = StateConnectPending; - return IceInternal.SocketStatus.NeedConnect; + return IceInternal.SocketOperation.Connect; } else if(_state <= StateConnectPending) { @@ -40,8 +40,8 @@ final class TransceiverI implements IceInternal.Transceiver } assert(_state == StateConnected); - IceInternal.SocketStatus status = handshakeNonBlocking(); - if(status != IceInternal.SocketStatus.Finished) + int status = handshakeNonBlocking(); + if(status != IceInternal.SocketOperation.None) { return status; } @@ -56,13 +56,13 @@ final class TransceiverI implements IceInternal.Transceiver throw ex; } - return IceInternal.SocketStatus.Finished; + return IceInternal.SocketOperation.None; } public void close() { - if(_instance.networkTraceLevel() >= 1) + if(_state == StateHandshakeComplete && _instance.networkTraceLevel() >= 1) { String s = "closing ssl connection\n" + toString(); _logger.trace(_instance.networkTraceCategory(), s); @@ -147,10 +147,10 @@ final class TransceiverI implements IceInternal.Transceiver throw new Ice.ConnectionLostException(); } - IceInternal.SocketStatus status = writeNonBlocking(buf.b); - if(status != IceInternal.SocketStatus.Finished) + int status = writeNonBlocking(buf.b); + if(status != IceInternal.SocketOperation.None) { - assert(status == IceInternal.SocketStatus.NeedWrite); + assert(status == IceInternal.SocketOperation.Write); return false; } return true; @@ -213,10 +213,10 @@ final class TransceiverI implements IceInternal.Transceiver } case BUFFER_UNDERFLOW: { - IceInternal.SocketStatus status = readNonBlocking(); - if(status != IceInternal.SocketStatus.Finished) + int status = readNonBlocking(); + if(status != IceInternal.SocketOperation.None) { - assert(status == IceInternal.SocketStatus.NeedRead); + assert(status == IceInternal.SocketOperation.Read); moreData.value = false; return false; } @@ -345,7 +345,7 @@ final class TransceiverI implements IceInternal.Transceiver super.finalize(); } - private IceInternal.SocketStatus + private int handshakeNonBlocking() { try @@ -391,8 +391,8 @@ final class TransceiverI implements IceInternal.Transceiver case BUFFER_UNDERFLOW: { assert(status == javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP); - IceInternal.SocketStatus ss = readNonBlocking(); - if(ss != IceInternal.SocketStatus.Finished) + int ss = readNonBlocking(); + if(ss != IceInternal.SocketOperation.None) { return ss; } @@ -413,8 +413,8 @@ final class TransceiverI implements IceInternal.Transceiver result = _engine.wrap(_emptyBuffer, _netOutput); if(result.bytesProduced() > 0) { - IceInternal.SocketStatus ss = flushNonBlocking(); - if(ss != IceInternal.SocketStatus.Finished) + int ss = flushNonBlocking(); + if(ss != IceInternal.SocketOperation.None) { return ss; } @@ -457,7 +457,7 @@ final class TransceiverI implements IceInternal.Transceiver throw e; } - return IceInternal.SocketStatus.Finished; + return IceInternal.SocketOperation.None; } private void @@ -515,7 +515,7 @@ final class TransceiverI implements IceInternal.Transceiver } } - private IceInternal.SocketStatus + private int writeNonBlocking(ByteBuffer buf) { // @@ -578,10 +578,10 @@ final class TransceiverI implements IceInternal.Transceiver // if(_netOutput.position() > 0) { - IceInternal.SocketStatus ss = flushNonBlocking(); - if(ss != IceInternal.SocketStatus.Finished) + int ss = flushNonBlocking(); + if(ss != IceInternal.SocketOperation.None) { - assert(ss == IceInternal.SocketStatus.NeedWrite); + assert(ss == IceInternal.SocketOperation.Write); return ss; } } @@ -596,10 +596,10 @@ final class TransceiverI implements IceInternal.Transceiver } assert(_netOutput.position() == 0); - return IceInternal.SocketStatus.Finished; + return IceInternal.SocketOperation.None; } - private IceInternal.SocketStatus + private int flushNonBlocking() { _netOutput.flip(); @@ -612,7 +612,7 @@ final class TransceiverI implements IceInternal.Transceiver _netOutput.limit(_netOutput.position() + packetSize); } - IceInternal.SocketStatus status = IceInternal.SocketStatus.Finished; + int status = IceInternal.SocketOperation.None; while(_netOutput.hasRemaining()) { try @@ -626,7 +626,7 @@ final class TransceiverI implements IceInternal.Transceiver } else if(ret == 0) { - status = IceInternal.SocketStatus.NeedWrite; + status = IceInternal.SocketOperation.Write; break; } @@ -653,7 +653,7 @@ final class TransceiverI implements IceInternal.Transceiver } } - if(status == IceInternal.SocketStatus.Finished) + if(status == IceInternal.SocketOperation.None) { _netOutput.clear(); } @@ -666,7 +666,7 @@ final class TransceiverI implements IceInternal.Transceiver return status; } - private IceInternal.SocketStatus + private int readNonBlocking() { while(true) @@ -682,7 +682,7 @@ final class TransceiverI implements IceInternal.Transceiver } else if(ret == 0) { - return IceInternal.SocketStatus.NeedRead; + return IceInternal.SocketOperation.Read; } break; @@ -699,7 +699,7 @@ final class TransceiverI implements IceInternal.Transceiver } } - return IceInternal.SocketStatus.Finished; + return IceInternal.SocketOperation.None; } private void |