diff options
author | Benoit Foucher <benoit@zeroc.com> | 2008-03-06 10:13:42 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2008-03-06 10:13:42 +0100 |
commit | c6dbd090d9691cc0116a2967b2827b858b184dfe (patch) | |
tree | 6d2ad80c98665c9090b16f97c400ab4b33c7ab73 /java/src | |
parent | Merge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff) | |
download | ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.bz2 ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.xz ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.zip |
Removed thread-per-connection and added serialize mode
Diffstat (limited to 'java/src')
40 files changed, 1409 insertions, 3387 deletions
diff --git a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java index 3c95aed0cd3..9087b9e1c26 100644 --- a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java +++ b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java @@ -13,7 +13,7 @@ public abstract class AMI_Object_ice_flushBatchRequests extends IceInternal.Batc { public abstract void ice_exception(LocalException ex); - public final void __invoke(Ice.ObjectPrx prx) + public final boolean __invoke(Ice.ObjectPrx prx) { __acquireCallback(prx); try @@ -28,7 +28,7 @@ public abstract class AMI_Object_ice_flushBatchRequests extends IceInternal.Batc try { delegate = proxy.__getDelegate(true); - delegate.__getRequestHandler().flushAsyncBatchRequests(this); + return delegate.__getRequestHandler().flushAsyncBatchRequests(this); } catch(Ice.LocalException ex) { @@ -39,5 +39,6 @@ public abstract class AMI_Object_ice_flushBatchRequests extends IceInternal.Batc { __releaseCallback(ex); } + return false; } } diff --git a/java/src/Ice/AMI_Object_ice_invoke.java b/java/src/Ice/AMI_Object_ice_invoke.java index 46deda21576..dfcc40477f7 100644 --- a/java/src/Ice/AMI_Object_ice_invoke.java +++ b/java/src/Ice/AMI_Object_ice_invoke.java @@ -14,8 +14,8 @@ public abstract class AMI_Object_ice_invoke extends IceInternal.OutgoingAsync public abstract void ice_response(boolean ok, byte[] outParams); public abstract void ice_exception(LocalException ex); - public final void __invoke(Ice.ObjectPrx prx, String operation, OperationMode mode, - byte[] inParams, java.util.Map context) + public final boolean __invoke(Ice.ObjectPrx prx, String operation, OperationMode mode, + byte[] inParams, java.util.Map context) { __acquireCallback(prx); try @@ -23,11 +23,12 @@ public abstract class AMI_Object_ice_invoke extends IceInternal.OutgoingAsync __prepare(prx, operation, mode, context); __os.writeBlob(inParams); __os.endWriteEncaps(); - __send(); + return __send(); } catch(LocalException ex) { __releaseCallback(ex); + return false; } } diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 3005bdff402..f19a8083852 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -9,8 +9,7 @@ package Ice; -public final class ConnectionI extends IceInternal.EventHandler - implements Connection, IceInternal.SelectorThread.SocketReadyCallback +public final class ConnectionI extends IceInternal.EventHandler implements Connection { public interface StartCallback { @@ -18,23 +17,6 @@ public final class ConnectionI extends IceInternal.EventHandler void connectionStartFailed(ConnectionI connection, Ice.LocalException ex); } - public class CallFinished implements IceInternal.ThreadPoolWorkItem - { - public - CallFinished(ConnectionI connection) - { - _connection = connection; - } - - public void - execute(IceInternal.ThreadPool threadPool) - { - _connection.finished(threadPool); - } - - final private ConnectionI _connection; - } - public void start(StartCallback callback) { @@ -42,96 +24,44 @@ public final class ConnectionI extends IceInternal.EventHandler { synchronized(this) { - _startCallback = callback; - - // - // The connection might already be closed if the communicator was destroyed. - // - if(_state == StateClosed) + if(_state == StateClosed) // The connection might already be closed if the communicator was destroyed. { assert(_exception != null); throw _exception; } - } - - if(_threadPerConnection) - { - // - // In thread per connection mode, we create the thread for the connection. The - // intialization and validation of the connection is taken care of by the thread - // per connection. - // - try - { - _thread = new ThreadPerConnection(); - _thread.start(); - } - catch(java.lang.Exception ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - _logger.error("cannot create thread for connection:\n" + sw.toString()); - - // - // Clean up. - // - _thread = null; - Ice.SyscallException e = new Ice.SyscallException(); - e.initCause(ex); - throw e; - } - } - else - { - // - // Initialize the connection transceiver and then validate the connection. - // - IceInternal.SocketStatus status = initialize(0); + IceInternal.SocketStatus status = initialize(); if(status == IceInternal.SocketStatus.Finished) { - status = validate(0); + status = validate(); } - if(status == IceInternal.SocketStatus.Finished) + if(status != IceInternal.SocketStatus.Finished) { - finishStart(null); - return; // We're done! - } - - // - // If the initialization or validation couldn't be completed without potentially - // blocking, we register the connection with the selector thread and return. - // - int timeout; - IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideConnectTimeout) - { - timeout = defaultsAndOverrides.overrideConnectTimeoutValue; - } - else - { - timeout = _endpoint.timeout(); - } - - synchronized(this) - { - if(_state == StateClosed) + // + // If the initialization or validation couldn't be completed without potentially + // blocking, we register the connection with the selector thread and return. + // + int timeout; + IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideConnectTimeout) { - assert(_exception != null); - throw _exception; + timeout = defaultsAndOverrides.overrideConnectTimeoutValue; } + else + { + timeout = _endpoint.timeout(); + } + _sendInProgress = true; - _selectorThread._register(_transceiver.fd(), this, status, timeout); - } - } + _selectorThread._register(_socketReadyCallback, status, timeout); + + if(callback != null) + { + _startCallback = callback; + return; + } - if(callback == null) // Wait for the connection to be validated. - { - synchronized(this) - { while(_state <= StateNotValidated) { try @@ -153,16 +83,22 @@ public final class ConnectionI extends IceInternal.EventHandler } catch(Ice.LocalException ex) { - synchronized(this) + exception(ex); + if(callback != null) { - setState(StateClosed, ex); - if(callback != null) - { - return; - } + callback.connectionStartFailed(this, _exception); + return; + } + else + { + waitUntilFinished(); + throw ex; } - waitUntilFinished(); - throw ex; + } + + if(callback != null) + { + callback.connectionStartCompleted(this); } } @@ -253,39 +189,15 @@ public final class ConnectionI extends IceInternal.EventHandler return _state > StateNotValidated && _state < StateClosing; } - public boolean + public synchronized boolean isFinished() { - Thread threadPerConnection = null; - - synchronized(this) - { - if(_transceiver != null || _dispatchCount != 0 || (_thread != null && _thread.isAlive())) - { - return false; - } - - assert(_state == StateClosed); - - threadPerConnection = _thread; - _thread = null; - } - - if(threadPerConnection != null) + if(_transceiver != null || _dispatchCount != 0) { - while(true) - { - try - { - threadPerConnection.join(); - break; - } - catch(InterruptedException ex) - { - } - } + return false; } - + + assert(_state == StateClosed); return true; } @@ -314,105 +226,82 @@ public final class ConnectionI extends IceInternal.EventHandler } } - public void + public synchronized void waitUntilFinished() { - Thread threadPerConnection = null; - - synchronized(this) + // + // We wait indefinitely until connection closing has been + // initiated. We also wait indefinitely until all outstanding + // requests are completed. Otherwise we couldn't guarantee + // that there are no outstanding calls when deactivate() is + // called on the servant locators. + // + while(_state < StateClosing || _dispatchCount > 0) { - // - // We wait indefinitely until connection closing has been - // initiated. We also wait indefinitely until all outstanding - // requests are completed. Otherwise we couldn't guarantee - // that there are no outstanding calls when deactivate() is - // called on the servant locators. - // - while(_state < StateClosing || _dispatchCount > 0) + try { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + wait(); } - - // - // Now we must wait until close() has been called on the - // transceiver. - // - while(_transceiver != null) + catch(InterruptedException ex) { - try + } + } + + // + // Now we must wait until close() has been called on the + // transceiver. + // + while(_transceiver != null) + { + try + { + if(_state != StateClosed && _endpoint.timeout() >= 0) { - if(_state != StateClosed && _endpoint.timeout() >= 0) - { - long absoluteWaitTime = _stateTime + _endpoint.timeout(); - long waitTime = absoluteWaitTime - IceInternal.Time.currentMonotonicTimeMillis(); + long absoluteWaitTime = _stateTime + _endpoint.timeout(); + long waitTime = absoluteWaitTime - IceInternal.Time.currentMonotonicTimeMillis(); - if(waitTime > 0) - { - // - // We must wait a bit longer until we close this - // connection. - // - wait(waitTime); - if(IceInternal.Time.currentMonotonicTimeMillis() >= absoluteWaitTime) - { - setState(StateClosed, new CloseTimeoutException()); - } - } - else + if(waitTime > 0) + { + // + // We must wait a bit longer until we close this + // connection. + // + wait(waitTime); + if(IceInternal.Time.currentMonotonicTimeMillis() >= absoluteWaitTime) { - // - // We already waited long enough, so let's close this - // connection! - // setState(StateClosed, new CloseTimeoutException()); } - - // - // No return here, we must still wait until - // close() is called on the _transceiver. - // } else { - wait(); + // + // We already waited long enough, so let's close this + // connection! + // + setState(StateClosed, new CloseTimeoutException()); } + + // + // No return here, we must still wait until + // close() is called on the _transceiver. + // } - catch(InterruptedException ex) + else { + wait(); } } - - assert(_state == StateClosed); - - threadPerConnection = _thread; - _thread = null; - - // - // Clear the OA. See bug 1673 for the details of why this is necessary. - // - _adapter = null; - } - - if(threadPerConnection != null) - { - while(true) + catch(InterruptedException ex) { - try - { - threadPerConnection.join(); - break; - } - catch(InterruptedException ex) - { - } } } + + assert(_state == StateClosed); + + // + // Clear the OA. See bug 1673 for the details of why this is necessary. + // + _adapter = null; } synchronized public void @@ -507,7 +396,7 @@ public final class ConnectionI extends IceInternal.EventHandler return sent; // The request was sent. } - synchronized public void + synchronized public boolean sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response) throws IceInternal.LocalExceptionWrapper { @@ -546,9 +435,10 @@ public final class ConnectionI extends IceInternal.EventHandler os.writeInt(requestId); } + boolean sent; try { - sendMessage(new OutgoingMessage(out, out.__os(), compress, response)); + sent = sendMessage(new OutgoingMessage(out, out.__os(), compress, response)); } catch(Ice.LocalException ex) { @@ -564,6 +454,7 @@ public final class ConnectionI extends IceInternal.EventHandler // _asyncRequests.put(requestId, out); } + return sent; } public synchronized void @@ -812,7 +703,7 @@ public final class ConnectionI extends IceInternal.EventHandler return sent; } - synchronized public void + synchronized public boolean flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync) { while(_batchStreamInUse && _exception == null) @@ -834,7 +725,7 @@ public final class ConnectionI extends IceInternal.EventHandler if(_batchRequestNum == 0) { outAsync.__sent(this); - return; + return true; } // @@ -844,11 +735,12 @@ public final class ConnectionI extends IceInternal.EventHandler _batchStream.writeInt(_batchRequestNum); _batchStream.swap(outAsync.__os()); - + + boolean sent; try { OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.__os(), _batchRequestCompress, false); - sendMessage(message); + sent = sendMessage(message); } catch(Ice.LocalException ex) { @@ -864,6 +756,7 @@ public final class ConnectionI extends IceInternal.EventHandler _batchRequestNum = 0; _batchRequestCompress = false; _batchMarker = 0; + return sent; } synchronized public void @@ -941,12 +834,6 @@ public final class ConnectionI extends IceInternal.EventHandler return _endpoint; // No mutex protection necessary, _endpoint is immutable. } - public boolean - threadPerConnection() - { - return _threadPerConnection; // No mutex protection necessary, _threadPerConnection is immutable. - } - public synchronized void setAdapter(ObjectAdapter adapter) { @@ -1000,29 +887,41 @@ public final class ConnectionI extends IceInternal.EventHandler } // + // Operations from SelectorHandler + // + public java.nio.channels.SelectableChannel + fd() + { + return _transceiver.fd(); + } + + public boolean + hasMoreData() + { + return _hasMoreData.value; + } + + // // Operations from EventHandler // public boolean datagram() { - assert(!_threadPerConnection); // Only for use with a thread pool. return _endpoint.datagram(); // No mutex protection necessary, _endpoint is immutable. } public boolean readable() { - assert(!_threadPerConnection); // Only for use with a thread pool. return true; } public boolean read(IceInternal.BasicStream stream) { - assert(!_threadPerConnection); // Only for use with a thread pool. - - return _transceiver.read(stream.getBuffer(), 0, _hasMoreData); + assert(_transceiver != null); + return _transceiver.read(stream.getBuffer(), _hasMoreData); // // Updating _acmAbsoluteTimeoutMillis is too expensive here, @@ -1032,17 +931,9 @@ public final class ConnectionI extends IceInternal.EventHandler // } - public boolean - hasMoreData() - { - return _hasMoreData.value; - } - public void message(IceInternal.BasicStream stream, IceInternal.ThreadPool threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. - MessageInfo info = new MessageInfo(stream); synchronized(this) @@ -1052,7 +943,7 @@ public final class ConnectionI extends IceInternal.EventHandler // there could be various race conditions with close // connection messages and other messages. // - threadPool.promoteFollower(); + threadPool.promoteFollower(this); if(_state != StateClosed) { @@ -1089,20 +980,14 @@ public final class ConnectionI extends IceInternal.EventHandler public void finished(IceInternal.ThreadPool threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. - - threadPool.promoteFollower(); + threadPool.promoteFollower(null); Ice.LocalException localEx = null; synchronized(this) { - --_finishedCount; - if(_finishedCount > 0 || _state != StateClosed || _sendInProgress) - { - return; - } - + assert(threadPool == _threadPool && _state == StateClosed && !_sendInProgress); + try { _transceiver.close(); @@ -1115,16 +1000,20 @@ public final class ConnectionI extends IceInternal.EventHandler _transceiver = null; notifyAll(); } - - finishStart(_exception); - java.util.Iterator<OutgoingMessage> p = _queuedStreams.iterator(); + if(_startCallback != null) + { + _startCallback.connectionStartFailed(this, _exception); + _startCallback = null; + } + + java.util.Iterator<OutgoingMessage> p = _sendStreams.iterator(); while(p.hasNext()) { OutgoingMessage message = p.next(); message.finished(_exception); } - _queuedStreams.clear(); + _sendStreams.clear(); java.util.Iterator<IceInternal.Outgoing> q = _requests.values().iterator(); // _requests is immutable at this point. @@ -1206,10 +1095,19 @@ public final class ConnectionI extends IceInternal.EventHandler // Operations from SocketReadyCallback // public IceInternal.SocketStatus - socketReady(boolean finished) + socketReady() { - if(!finished) + StartCallback callback = null; + + synchronized(this) { + assert(_sendInProgress); + + if(_state == StateClosed) + { + return IceInternal.SocketStatus.Finished; + } + try { // @@ -1218,7 +1116,7 @@ public final class ConnectionI extends IceInternal.EventHandler // if(!_sendStreams.isEmpty()) { - if(!send(0)) + if(!send()) { return IceInternal.SocketStatus.NeedWrite; } @@ -1226,121 +1124,68 @@ public final class ConnectionI extends IceInternal.EventHandler } else { - // - // If there's nothing to send, we're still validating the connection. - // - int state; - synchronized(this) + if(_state == StateNotInitialized) { - assert(_state == StateClosed || _state <= StateNotValidated); - - if(_state == StateClosed) - { - assert(_exception != null); - throw _exception; - } - - state = _state; - } - - if(state == StateNotInitialized) - { - IceInternal.SocketStatus status = initialize(0); + IceInternal.SocketStatus status = initialize(); if(status != IceInternal.SocketStatus.Finished) { return status; } } - if(state <= StateNotValidated) + if(_state <= StateNotValidated) { - IceInternal.SocketStatus status = validate(0); + IceInternal.SocketStatus status = validate(); if(status != IceInternal.SocketStatus.Finished) { return status; } } - finishStart(null); + callback = _startCallback; + _startCallback = null; } } catch(Ice.LocalException ex) { - synchronized(this) - { - setState(StateClosed, ex); - } - } - } - - // - // If there's no more data to send or if connection validation is finished, we checkout - // the connection state to figure out whether or not it's time to unregister with the - // selector thread. - // - - synchronized(this) - { - assert(_sendInProgress); - if(_state == StateClosed) - { - assert(_startCallback == null || (!_threadPerConnection && !_registeredWithPool)); - - _queuedStreams.addAll(0, _sendStreams); - _sendInProgress = false; - - if(_threadPerConnection) - { - _transceiver.shutdownReadWrite(); - } - else - { - if(!_registeredWithPool) - { - _threadPool.execute(new CallFinished(this)); - ++_finishedCount; // For each unregistration, finished() is called once. - } - else - { - unregisterWithPool(); - } - } - - notifyAll(); - return IceInternal.SocketStatus.Finished; - } - else if(_queuedStreams.isEmpty()) - { - _sendInProgress = false; - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; - } + setState(StateClosed, ex); return IceInternal.SocketStatus.Finished; } - else + + assert(_sendStreams.isEmpty()); + _selectorThread.unregister(_socketReadyCallback); + _sendInProgress = false; + if(_acmTimeout > 0) { - java.util.LinkedList<OutgoingMessage> streams = _queuedStreams; - _queuedStreams = _sendStreams; - _sendStreams = streams; - return IceInternal.SocketStatus.NeedWrite; // We're not finished yet, there's more data to send! + _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; } } + + if(callback != null) + { + callback.connectionStartCompleted(this); + } + return IceInternal.SocketStatus.Finished; } - public void + public synchronized void + socketFinished() + { + assert(_sendInProgress && _state == StateClosed); + _sendInProgress = false; + _threadPool.finish(this); + } + + public synchronized void socketTimeout() { - synchronized(this) + if(_state <= StateNotValidated) { - if(_state <= StateNotValidated) - { - setState(StateClosed, new ConnectTimeoutException()); - } - else if(_state <= StateClosing) - { - setState(StateClosed, new TimeoutException()); - } + setState(StateClosed, new ConnectTimeoutException()); + } + else if(_state <= StateClosing) + { + setState(StateClosed, new TimeoutException()); } } @@ -1357,12 +1202,11 @@ public final class ConnectionI extends IceInternal.EventHandler } public ConnectionI(IceInternal.Instance instance, IceInternal.Transceiver transceiver, - IceInternal.EndpointI endpoint, ObjectAdapter adapter, boolean threadPerConnection) + IceInternal.EndpointI endpoint, ObjectAdapter adapter) { super(instance); final Ice.InitializationData initData = instance.initializationData(); - _threadPerConnection = threadPerConnection; _transceiver = transceiver; _desc = transceiver.toString(); _type = transceiver.type(); @@ -1370,8 +1214,6 @@ public final class ConnectionI extends IceInternal.EventHandler _adapter = adapter; _logger = initData.logger; // Cached for better performance. _traceLevels = instance.traceLevels(); // Cached for better performance. - _registeredWithPool = false; - _finishedCount = 0; _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; _cacheBuffers = initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 1) == 1; _acmAbsoluteTimeoutMillis = 0; @@ -1425,26 +1267,13 @@ public final class ConnectionI extends IceInternal.EventHandler try { - if(!threadPerConnection) + if(_adapter != null) { - // - // Only set _threadPool if we really need it, i.e., if we are - // not in thread per connection mode. Thread pools have lazy - // initialization in Instance, and we don't want them to be - // created if they are not needed. - // - if(_adapter != null) - { - _threadPool = ((ObjectAdapterI)_adapter).getThreadPool(); - } - else - { - _threadPool = _instance.clientThreadPool(); - } + _threadPool = ((ObjectAdapterI)_adapter).getThreadPool(); } else { - _threadPool = null; // To satisfy the compiler. + _threadPool = _instance.clientThreadPool(); } _selectorThread = _instance.selectorThread(); @@ -1472,8 +1301,7 @@ public final class ConnectionI extends IceInternal.EventHandler IceUtilInternal.Assert.FinalizerAssert(_state == StateClosed); IceUtilInternal.Assert.FinalizerAssert(_transceiver == null); IceUtilInternal.Assert.FinalizerAssert(_dispatchCount == 0); - IceUtilInternal.Assert.FinalizerAssert(_thread == null); - IceUtilInternal.Assert.FinalizerAssert(_queuedStreams.isEmpty()); + IceUtilInternal.Assert.FinalizerAssert(_sendStreams.isEmpty()); IceUtilInternal.Assert.FinalizerAssert(_requests.isEmpty()); IceUtilInternal.Assert.FinalizerAssert(_asyncRequests.isEmpty()); @@ -1589,10 +1417,7 @@ public final class ConnectionI extends IceInternal.EventHandler { return; } - if(!_threadPerConnection) - { - registerWithPool(); - } + _threadPool._register(this); break; } @@ -1606,10 +1431,7 @@ public final class ConnectionI extends IceInternal.EventHandler { return; } - if(!_threadPerConnection) - { - unregisterWithPool(); - } + _threadPool.unregister(this); break; } @@ -1622,10 +1444,7 @@ public final class ConnectionI extends IceInternal.EventHandler { return; } - if(!_threadPerConnection) - { - registerWithPool(); // We need to continue to read in closing state. - } + _threadPool._register(this); break; } @@ -1640,39 +1459,12 @@ public final class ConnectionI extends IceInternal.EventHandler // The selector thread will register again the FD with the pool once it's // done. // - _selectorThread.unregister(_transceiver.fd()); - if(!_threadPerConnection) - { - unregisterWithPool(); - } - - _transceiver.shutdownWrite(); - } - else if(_threadPerConnection) - { - // - // If we are in thread per connection mode and the thread is started, we - // shutdown both for reading and writing. This will unblock the read call - // with an exception. The thread per connection closes the transceiver. - // - _transceiver.shutdownReadWrite(); + _selectorThread.finish(_socketReadyCallback); + _threadPool.unregister(this); } else { - if(!_registeredWithPool) - { - _threadPool.execute(new CallFinished(this)); - ++_finishedCount; // For each unregistration, finished() is called once. - } - else - { - unregisterWithPool(); - } - - // - // Prevent further writes. - // - _transceiver.shutdownWrite(); + _threadPool.finish(this); } break; } @@ -1752,46 +1544,24 @@ public final class ConnectionI extends IceInternal.EventHandler } private IceInternal.SocketStatus - initialize(int timeout) + initialize() { - try - { - IceInternal.SocketStatus status = _transceiver.initialize(timeout); - if(status != IceInternal.SocketStatus.Finished) - { - if(timeout != 0) - { - throw new Ice.TimeoutException(); - } - return status; - } - } - catch(Ice.TimeoutException ex) + IceInternal.SocketStatus status = _transceiver.initialize(); + if(status != IceInternal.SocketStatus.Finished) { - throw new Ice.ConnectTimeoutException(); - } - - synchronized(this) - { - if(_state == StateClosed) - { - assert(_exception != null); - throw _exception; - } - - // - // Update the connection description once the transceiver is initialized. - // - _desc = _transceiver.toString(); - - setState(StateNotValidated); + return status; } + // + // Update the connection description once the transceiver is initialized. + // + _desc = _transceiver.toString(); + setState(StateNotValidated); return IceInternal.SocketStatus.Finished; } private IceInternal.SocketStatus - validate(int timeout) + validate() { if(!_endpoint.datagram()) // Datagram connections are always implicitly validated. { @@ -1811,26 +1581,10 @@ public final class ConnectionI extends IceInternal.EventHandler IceInternal.TraceUtil.traceSend(os, _logger, _traceLevels); os.prepareWrite(); } - else - { - // The stream can only be non-empty if we're doing a non-blocking connection validation. - assert(!_threadPerConnection); - } - try - { - if(!_transceiver.write(os.getBuffer(), timeout)) - { - if(timeout != 0) - { - throw new Ice.TimeoutException(); - } - return IceInternal.SocketStatus.NeedWrite; - } - } - catch(Ice.TimeoutException ex) + if(!_transceiver.write(os.getBuffer())) { - throw new Ice.ConnectTimeoutException(); + return IceInternal.SocketStatus.NeedWrite; } } else // The client side has the passive role for connection validation. @@ -1841,26 +1595,10 @@ public final class ConnectionI extends IceInternal.EventHandler is.resize(IceInternal.Protocol.headerSize, true); is.pos(0); } - else - { - // The stream can only be non-empty if we're doing a non-blocking connection validation. - assert(!_threadPerConnection); - } - try + if(!_transceiver.read(is.getBuffer(), _hasMoreData)) { - if(!_transceiver.read(is.getBuffer(), timeout, _hasMoreData)) - { - if(timeout != 0) - { - throw new Ice.TimeoutException(); - } - return IceInternal.SocketStatus.NeedRead; - } - } - catch(Ice.TimeoutException ex) - { - throw new Ice.ConnectTimeoutException(); + return IceInternal.SocketStatus.NeedRead; } assert(is.pos() == IceInternal.Protocol.headerSize); @@ -1910,31 +1648,23 @@ public final class ConnectionI extends IceInternal.EventHandler } } - synchronized(this) - { - _stream.reset(); - - if(_state == StateClosed) - { - assert(_exception != null); - throw _exception; - } - - // - // We start out in holding state. - // - setState(StateHolding); - } + _stream.reset(); + // + // We start out in holding state. + // + setState(StateHolding); return IceInternal.SocketStatus.Finished; } private boolean - send(int timeout) + send() { assert(_transceiver != null); assert(!_sendStreams.isEmpty()); + boolean flushSentCallbacks = _sentCallbacks.isEmpty(); + while(!_sendStreams.isEmpty()) { OutgoingMessage message = _sendStreams.getFirst(); @@ -1958,34 +1688,56 @@ public final class ConnectionI extends IceInternal.EventHandler } - if(!_transceiver.write(message.stream.getBuffer(), timeout)) + if(!_transceiver.write(message.stream.getBuffer())) { - assert(timeout == 0); + if(flushSentCallbacks && !_sentCallbacks.isEmpty()) + { + _threadPool.execute(_flushSentCallbacks); + } return false; } - message.sent(this, timeout == 0); // timeout == 0 indicates that this is called by the selector thread. + message.sent(this, true); + + if(message.outAsync instanceof Ice.AMISentCallback) + { + _sentCallbacks.add(message); + } + _sendStreams.removeFirst(); } + if(flushSentCallbacks && !_sentCallbacks.isEmpty()) + { + _threadPool.execute(_flushSentCallbacks); + } return true; } + private void + flushSentCallbacks() + { + java.util.List<OutgoingMessage> sentCallbacks; + synchronized(this) + { + assert(_sentCallbacks != null && !_sentCallbacks.isEmpty()); + sentCallbacks = _sentCallbacks; + _sentCallbacks = new java.util.LinkedList<OutgoingMessage>(); + } + for(OutgoingMessage message : sentCallbacks) + { + message.outAsync.__sent(_instance); + } + } + private boolean sendMessage(OutgoingMessage message) { assert(_state != StateClosed); - - // - // If another thread is currently sending messages, we queue the - // message in _queuedStreams. It will be picked up eventually by - // the selector thread once the messages from _sendStreams are all - // sent. - // if(_sendInProgress) { message.adopt(); - _queuedStreams.addLast(message); + _sendStreams.addLast(message); return false; } @@ -2015,7 +1767,7 @@ public final class ConnectionI extends IceInternal.EventHandler IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels); } - if(_transceiver.write(message.stream.getBuffer(), 0)) + if(_transceiver.write(message.stream.getBuffer())) { message.sent(this, false); if(_acmTimeout > 0) @@ -2028,64 +1780,10 @@ public final class ConnectionI extends IceInternal.EventHandler _sendStreams.addLast(message); _sendInProgress = true; message.adopt(); - _selectorThread._register(_transceiver.fd(), this, IceInternal.SocketStatus.NeedWrite, _endpoint.timeout()); + _selectorThread._register(_socketReadyCallback, IceInternal.SocketStatus.NeedWrite, _endpoint.timeout()); return false; } - private void - finishStart(Ice.LocalException ex) - { - // - // We set _startCallback to null to break potential cyclic reference count - // and because the finalizer checks for it to ensure that we always invoke - // on the callback. - // - - StartCallback callback = null; - synchronized(this) - { - callback = _startCallback; - _startCallback = null; - } - - if(callback != null) - { - if(ex == null) - { - callback.connectionStartCompleted(this); - } - else - { - callback.connectionStartFailed(this, ex); - } - } - } - - private void - registerWithPool() - { - assert(!_threadPerConnection); // Only for use with a thread pool. - - if(!_registeredWithPool) - { - _threadPool._register(_transceiver.fd(), this); - _registeredWithPool = true; - } - } - - private void - unregisterWithPool() - { - assert(!_threadPerConnection); // Only for use with a thread pool. - - if(_registeredWithPool) - { - _threadPool.unregister(_transceiver.fd()); - _registeredWithPool = false; - ++_finishedCount; // For each unregistration, finished() is called once. - } - } - private IceInternal.BasicStream doCompress(IceInternal.BasicStream uncompressed, boolean compress) { @@ -2166,9 +1864,8 @@ public final class ConnectionI extends IceInternal.EventHandler try { // - // We don't need to check magic and version here. This has - // already been done by the ThreadPool or ThreadPerConnection, - // which provides us with the stream. + // We don't need to check magic and version here. This has already + // been done by the ThreadPool which provides us with the stream. // assert(info.stream.pos() == info.stream.size()); info.stream.pos(8); @@ -2392,305 +2089,6 @@ public final class ConnectionI extends IceInternal.EventHandler } private void - run() - { - try - { - // - // Initialize the connection transceiver and validate the connection using - // blocking operations. - // - - IceInternal.SocketStatus status; - - int timeout; - IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideConnectTimeout) - { - timeout = defaultsAndOverrides.overrideConnectTimeoutValue; - } - else - { - timeout = _endpoint.timeout(); - } - - status = initialize(timeout); - assert(status == IceInternal.SocketStatus.Finished); - - status = validate(timeout); - assert(status == IceInternal.SocketStatus.Finished); - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - - if(_transceiver != null) - { - try - { - _transceiver.close(); - } - catch(LocalException e) - { - // Here we ignore any exceptions in close(). - } - - _transceiver = null; - } - notifyAll(); - } - - finishStart(_exception); - return; - } - - finishStart(null); - - boolean warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; - - boolean closed = false; - - IceInternal.BasicStream stream = new IceInternal.BasicStream(_instance); - while(!closed) - { - // - // We must read new messages outside the thread synchronization because we use blocking reads. - // - - try - { - try - { - stream.resize(IceInternal.Protocol.headerSize, true); - stream.pos(0); - _transceiver.read(stream.getBuffer(), -1, _hasMoreData); - - int pos = stream.pos(); - if(pos < IceInternal.Protocol.headerSize) - { - // - // This situation is possible for small UDP packets. - // - throw new IllegalMessageSizeException(); - } - stream.pos(0); - byte[] m = stream.readBlob(4); - if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || - m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) - { - BadMagicException ex = new BadMagicException(); - ex.badMagic = m; - throw ex; - } - byte pMajor = stream.readByte(); - byte pMinor = stream.readByte(); - if(pMajor != IceInternal.Protocol.protocolMajor) - { - UnsupportedProtocolException e = new UnsupportedProtocolException(); - e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; - e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; - e.major = IceInternal.Protocol.protocolMajor; - e.minor = IceInternal.Protocol.protocolMinor; - throw e; - } - byte eMajor = stream.readByte(); - byte eMinor = stream.readByte(); - if(eMajor != IceInternal.Protocol.encodingMajor) - { - UnsupportedEncodingException e = new UnsupportedEncodingException(); - e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; - e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; - e.major = IceInternal.Protocol.encodingMajor; - e.minor = IceInternal.Protocol.encodingMinor; - throw e; - } - byte messageType = stream.readByte(); - byte compress = stream.readByte(); - int size = stream.readInt(); - if(size < IceInternal.Protocol.headerSize) - { - throw new IllegalMessageSizeException(); - } - if(size > _instance.messageSizeMax()) - { - throw new MemoryLimitException(); - } - if(size > stream.size()) - { - stream.resize(size, true); - } - stream.pos(pos); - - if(pos != stream.size()) - { - if(_endpoint.datagram()) - { - if(warnUdp) - { - _logger.warning("DatagramLimitException: maximum size of " + pos + " exceeded"); - } - throw new DatagramLimitException(); - } - else - { - _transceiver.read(stream.getBuffer(), -1, _hasMoreData); - assert(stream.pos() == stream.size()); - } - } - } - catch(DatagramLimitException ex) // Expected. - { - continue; - } - catch(SocketException ex) - { - exception(ex); - } - catch(LocalException ex) - { - if(_endpoint.datagram()) - { - if(_warn) - { - _logger.warning("datagram connection exception:\n" + ex + "\n" + _desc); - } - continue; - } - else - { - exception(ex); - } - } - - MessageInfo info = new MessageInfo(stream); - - LocalException localEx = null; - - synchronized(this) - { - while(_state == StateHolding) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - if(_state != StateClosed) - { - parseMessage(info); - } - - // - // parseMessage() can close the connection, so we must - // check for closed state again. - // - if(_state == StateClosed) - { - if(_sendInProgress) - { - _selectorThread.unregister(_transceiver.fd()); - } - - // - // Prevent further writes. - // - _transceiver.shutdownWrite(); - - while(_sendInProgress) - { - try - { - wait(); - } - catch(java.lang.Exception ex) - { - } - } - - try - { - _transceiver.close(); - } - catch(LocalException ex) - { - localEx = ex; - } - _transceiver = null; - notifyAll(); - - // - // We cannot simply return here. We have to make sure - // that all requests (regular and async) are notified - // about the closed connection below. - // - closed = true; - } - } - - // - // Asynchronous replies must be handled outside the thread - // synchronization, so that nested calls are possible. - // - if(info.outAsync != null) - { - info.outAsync.__finished(info.stream); - } - - // - // Method invocation (or multiple invocations for batch messages) - // must be done outside the thread synchronization, so that nested - // calls are possible. - // - invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, - info.adapter); - - if(closed) - { - java.util.Iterator<OutgoingMessage> p = _queuedStreams.iterator(); - while(p.hasNext()) - { - OutgoingMessage message = p.next(); - message.finished(_exception); - } - _queuedStreams.clear(); - - java.util.Iterator<IceInternal.Outgoing> q = _requests.values().iterator(); - while(q.hasNext()) - { - IceInternal.Outgoing out = q.next(); - out.finished(_exception); // The exception is immutable at this point. - } - _requests.clear(); - - java.util.Iterator<IceInternal.OutgoingAsync> r = _asyncRequests.values().iterator(); - while(r.hasNext()) - { - IceInternal.OutgoingAsync out = r.next(); - out.__finished(_exception); // The exception is immutable at this point. - } - _asyncRequests.clear(); - } - - if(localEx != null) - { - assert(closed); - throw localEx; - } - } - finally - { - stream.reset(); - } - } - } - - private void warning(String msg, Exception ex) { java.io.StringWriter sw = new java.io.StringWriter(); @@ -2809,34 +2207,6 @@ public final class ConnectionI extends IceInternal.EventHandler } } - private class ThreadPerConnection extends Thread - { - public void - run() - { - if(ConnectionI.this._instance.initializationData().threadHook != null) - { - ConnectionI.this._instance.initializationData().threadHook.start(); - } - - try - { - ConnectionI.this.run(); - } - catch(Exception ex) - { - ConnectionI.this.error("exception in thread per connection", ex); - } - finally - { - if(ConnectionI.this._instance.initializationData().threadHook != null) - { - ConnectionI.this._instance.initializationData().threadHook.stop(); - } - } - } - } - private static class OutgoingMessage { OutgoingMessage(IceInternal.BasicStream stream, boolean compress, boolean adopt) @@ -2919,8 +2289,46 @@ public final class ConnectionI extends IceInternal.EventHandler boolean prepared; } - private Thread _thread; - private final boolean _threadPerConnection; + static class SocketReadyCallback extends IceInternal.SelectorThread.SocketReadyCallback + { + public + SocketReadyCallback(ConnectionI connection) + { + _connection = connection; + } + + public java.nio.channels.SelectableChannel + fd() + { + return _connection.fd(); + } + + public boolean + hasMoreData() + { + return _connection.hasMoreData(); + } + + public IceInternal.SocketStatus + socketReady() + { + return _connection.socketReady(); + } + + public void + socketFinished() + { + _connection.socketFinished(); + } + + public void + runTimerTask() + { + _connection.socketTimeout(); + } + + final private ConnectionI _connection; + }; private IceInternal.Transceiver _transceiver; private Ice.BooleanHolder _hasMoreData = new Ice.BooleanHolder(false); @@ -2928,6 +2336,7 @@ public final class ConnectionI extends IceInternal.EventHandler private String _desc; private final String _type; private final IceInternal.EndpointI _endpoint; + private final SocketReadyCallback _socketReadyCallback = new SocketReadyCallback(this); private ObjectAdapter _adapter; private IceInternal.ServantManager _servantManager; @@ -2935,8 +2344,6 @@ public final class ConnectionI extends IceInternal.EventHandler private final Logger _logger; private final IceInternal.TraceLevels _traceLevels; - private boolean _registeredWithPool; - private int _finishedCount; private final IceInternal.ThreadPool _threadPool; private final IceInternal.SelectorThread _selectorThread; @@ -2965,10 +2372,20 @@ public final class ConnectionI extends IceInternal.EventHandler private boolean _batchRequestCompress; private int _batchMarker; - private java.util.LinkedList<OutgoingMessage> _queuedStreams = new java.util.LinkedList<OutgoingMessage>(); private java.util.LinkedList<OutgoingMessage> _sendStreams = new java.util.LinkedList<OutgoingMessage>(); private boolean _sendInProgress; + private java.util.List<OutgoingMessage> _sentCallbacks = new java.util.LinkedList<OutgoingMessage>(); + private IceInternal.ThreadPoolWorkItem _flushSentCallbacks = new IceInternal.ThreadPoolWorkItem() + { + public void + execute(IceInternal.ThreadPool threadPool) + { + threadPool.promoteFollower(null); + ConnectionI.this.flushSentCallbacks(); + }; + }; + private int _dispatchCount; private int _state; // The current state. diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index 6998793fa06..9459d4bdb6a 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -725,15 +725,6 @@ public final class ObjectAdapterI implements ObjectAdapter return _servantManager; } - public boolean - getThreadPerConnection() - { - // - // No mutex lock necessary, _threadPerConnection is immutable. - // - return _threadPerConnection; - } - // // Only for use by IceInternal.ObjectAdapterFactory // @@ -821,23 +812,12 @@ public final class ObjectAdapterI implements ObjectAdapter try { - _threadPerConnection = properties.getPropertyAsInt(_name + ".ThreadPerConnection") > 0; - int threadPoolSize = properties.getPropertyAsInt(_name + ".ThreadPool.Size"); int threadPoolSizeMax = properties.getPropertyAsInt(_name + ".ThreadPool.SizeMax"); - if(_threadPerConnection && (threadPoolSize > 0 || threadPoolSizeMax > 0)) - { - InitializationException ex = new InitializationException(); - ex.reason = "object adapter `" + _name + "' cannot be configured for both\n" + - "thread pool and thread per connection"; - throw ex; - } - - if(!_threadPerConnection && threadPoolSize == 0 && threadPoolSizeMax == 0) - { - _threadPerConnection = _instance.threadPerConnection(); - } + // + // Create the per-adapter thread pool, if necessary. + // if(threadPoolSize > 0 || threadPoolSizeMax > 0) { _threadPool = new IceInternal.ThreadPool(_instance, _name + ".ThreadPool", 0); @@ -1372,8 +1352,6 @@ public final class ObjectAdapterI implements ObjectAdapter "ReplicaGroupId", "Router", "ProxyOptions", - "ThreadPerConnection", - "ThreadPerConnection.StackSize", "ThreadPool.Size", "ThreadPool.SizeMax", "ThreadPool.SizeWarn", @@ -1447,6 +1425,5 @@ public final class ObjectAdapterI implements ObjectAdapter private boolean _destroying; private boolean _destroyed; private boolean _noConfig; - private boolean _threadPerConnection; private Identity _processId = null; } diff --git a/java/src/Ice/ObjectPrx.java b/java/src/Ice/ObjectPrx.java index 4cf498fd19c..cc8bea254ff 100644 --- a/java/src/Ice/ObjectPrx.java +++ b/java/src/Ice/ObjectPrx.java @@ -34,9 +34,9 @@ public interface ObjectPrx boolean ice_invoke(String operation, OperationMode mode, byte[] inParams, ByteSeqHolder outParams, java.util.Map __context); - void ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams); - void ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams, - java.util.Map context); + boolean ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams); + boolean ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams, + java.util.Map context); Identity ice_getIdentity(); ObjectPrx ice_identity(Identity newIdentity); @@ -96,14 +96,11 @@ public interface ObjectPrx ObjectPrx ice_timeout(int t); ObjectPrx ice_connectionId(String connectionId); - boolean ice_isThreadPerConnection(); - ObjectPrx ice_threadPerConnection(boolean tpc); - Connection ice_getConnection(); Connection ice_getCachedConnection(); void ice_flushBatchRequests(); - void ice_flushBatchRequests_async(AMI_Object_ice_flushBatchRequests cb); + boolean ice_flushBatchRequests_async(AMI_Object_ice_flushBatchRequests cb); boolean equals(java.lang.Object r); } diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index 5621fbbbbbb..0b4bdbb048f 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -251,14 +251,14 @@ public class ObjectPrxHelperBase implements ObjectPrx } } - public final void + public final boolean ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams) { __checkTwowayOnly("ice_invoke_async"); - cb.__invoke(this, operation, mode, inParams, null); + return cb.__invoke(this, operation, mode, inParams, null); } - public final void + public final boolean ice_invoke_async(AMI_Object_ice_invoke cb, String operation, OperationMode mode, byte[] inParams, java.util.Map context) { @@ -267,7 +267,7 @@ public class ObjectPrxHelperBase implements ObjectPrx context = _emptyContext; } __checkTwowayOnly("ice_invoke_async"); - cb.__invoke(this, operation, mode, inParams, context); + return cb.__invoke(this, operation, mode, inParams, context); } public final Identity @@ -680,26 +680,6 @@ public class ObjectPrxHelperBase implements ObjectPrx } } - public boolean - ice_isThreadPerConnection() - { - return _reference.getThreadPerConnection(); - } - - public ObjectPrx - ice_threadPerConnection(boolean tpc) - { - IceInternal.Reference ref = _reference.changeThreadPerConnection(tpc); - if(ref.equals(_reference)) - { - return this; - } - else - { - return newInstance(ref); - } - } - public final Connection ice_getConnection() { @@ -764,10 +744,10 @@ public class ObjectPrxHelperBase implements ObjectPrx } } - public void + public boolean ice_flushBatchRequests_async(AMI_Object_ice_flushBatchRequests cb) { - cb.__invoke(this); + return cb.__invoke(this); } public final boolean diff --git a/java/src/IceGridGUI/Coordinator.java b/java/src/IceGridGUI/Coordinator.java index 769335e1c71..8d9ebb0400c 100755 --- a/java/src/IceGridGUI/Coordinator.java +++ b/java/src/IceGridGUI/Coordinator.java @@ -1798,8 +1798,6 @@ public class Coordinator // properties.setProperty("Ice.Override.ConnectTimeout", "5000"); - properties.setProperty("Ice.ThreadPerConnection", "1"); - // // For Glacier // diff --git a/java/src/IceInternal/Acceptor.java b/java/src/IceInternal/Acceptor.java index 25de96607bc..4c2936175af 100644 --- a/java/src/IceInternal/Acceptor.java +++ b/java/src/IceInternal/Acceptor.java @@ -14,7 +14,6 @@ public interface Acceptor java.nio.channels.ServerSocketChannel fd(); void close(); void listen(); - Transceiver accept(int timeout); - void connectToSelf(); + Transceiver accept(); String toString(); } diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java index e9d07332a06..6596a4e3375 100644 --- a/java/src/IceInternal/BatchOutgoing.java +++ b/java/src/IceInternal/BatchOutgoing.java @@ -57,9 +57,9 @@ public final class BatchOutgoing implements OutgoingMessageCallback } public void - sent(boolean notify) + sent(boolean async) { - if(notify) + if(async) { synchronized(this) { diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 94c3c3721d6..7c0a1db0c0c 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -144,7 +144,7 @@ public class ConnectRequestHandler } } - public void + public boolean sendAsyncRequest(OutgoingAsync out) throws LocalExceptionWrapper { @@ -153,10 +153,10 @@ public class ConnectRequestHandler if(!initialized()) { _requests.add(new Request(out)); - return; + return false; } } - _connection.sendAsyncRequest(out, _compress, _response); + return _connection.sendAsyncRequest(out, _compress, _response); } public boolean @@ -165,7 +165,7 @@ public class ConnectRequestHandler return getConnection(true).flushBatchRequests(out); } - public void + public boolean flushAsyncBatchRequests(BatchOutgoingAsync out) { synchronized(this) @@ -173,10 +173,10 @@ public class ConnectRequestHandler if(!initialized()) { _requests.add(new Request(out)); - return; + return false; } } - _connection.flushAsyncBatchRequests(out); + return _connection.flushAsyncBatchRequests(out); } public Outgoing @@ -297,7 +297,7 @@ public class ConnectRequestHandler public void execute(ThreadPool threadPool) { - threadPool.promoteFollower(); + threadPool.promoteFollower(null); flushRequestsWithException(ex); }; }); @@ -392,6 +392,8 @@ public class ConnectRequestHandler _flushing = true; } + final java.util.List<OutgoingAsyncMessageCallback> sentCallbacks = + new java.util.ArrayList<OutgoingAsyncMessageCallback>(); try { java.util.Iterator<Request> p = _requests.iterator(); // _requests is immutable when _flushing = true @@ -400,11 +402,23 @@ public class ConnectRequestHandler Request request = p.next(); if(request.out != null) { - _connection.sendAsyncRequest(request.out, _compress, _response); + if(_connection.sendAsyncRequest(request.out, _compress, _response)) + { + if(request.out instanceof Ice.AMISentCallback) + { + sentCallbacks.add(request.out); + } + } } else if(request.batchOut != null) { - _connection.flushAsyncBatchRequests(request.batchOut); + if(_connection.flushAsyncBatchRequests(request.batchOut)) + { + if(request.batchOut instanceof Ice.AMISentCallback) + { + sentCallbacks.add(request.batchOut); + } + } } else { @@ -436,12 +450,10 @@ public class ConnectRequestHandler public void execute(ThreadPool threadPool) { - threadPool.promoteFollower(); + threadPool.promoteFollower(null); flushRequestsWithException(ex); }; }); - notifyAll(); - return; } } catch(final Ice.LocalException ex) @@ -455,15 +467,30 @@ public class ConnectRequestHandler public void execute(ThreadPool threadPool) { - threadPool.promoteFollower(); + threadPool.promoteFollower(null); flushRequestsWithException(ex); }; }); - notifyAll(); - return; } } - + + if(!sentCallbacks.isEmpty()) + { + final Instance instance = _reference.getInstance(); + instance.clientThreadPool().execute(new ThreadPoolWorkItem() + { + public void + execute(ThreadPool threadPool) + { + threadPool.promoteFollower(null); + for(OutgoingAsyncMessageCallback callback : sentCallbacks) + { + callback.__sent(instance); + } + }; + }); + } + // // We've finished sending the queued requests and the request handler now send // the requests over the connection directly. It's time to substitute the @@ -480,9 +507,12 @@ public class ConnectRequestHandler synchronized(this) { - assert(!_initialized); - _initialized = true; - _flushing = false; + if(_exception == null) + { + assert(!_initialized); + _initialized = true; + _flushing = false; + } _proxy = null; // Break cyclic reference count. _delegate = null; // Break cyclic reference count. notifyAll(); diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java index 8b74c44b9f1..c02f9e580a2 100644 --- a/java/src/IceInternal/ConnectionRequestHandler.java +++ b/java/src/IceInternal/ConnectionRequestHandler.java @@ -39,15 +39,15 @@ public class ConnectionRequestHandler implements RequestHandler } else { - return null; // The request hasn't been sent yet. + return null; // The request has been sent. } } - public void + public boolean sendAsyncRequest(OutgoingAsync out) throws LocalExceptionWrapper { - _connection.sendAsyncRequest(out, _compress, _response); + return _connection.sendAsyncRequest(out, _compress, _response); } public boolean @@ -56,10 +56,10 @@ public class ConnectionRequestHandler implements RequestHandler return _connection.flushBatchRequests(out); } - public void + public boolean flushAsyncBatchRequests(BatchOutgoingAsync out) { - _connection.flushAsyncBatchRequests(out); + return _connection.flushAsyncBatchRequests(out); } public Outgoing diff --git a/java/src/IceInternal/Connector.java b/java/src/IceInternal/Connector.java index 7a0b27a0b40..cda93a65d91 100644 --- a/java/src/IceInternal/Connector.java +++ b/java/src/IceInternal/Connector.java @@ -11,7 +11,7 @@ package IceInternal; public interface Connector { - Transceiver connect(int timeout); + Transceiver connect(); short type(); String toString(); diff --git a/java/src/IceInternal/EventHandler.java b/java/src/IceInternal/EventHandler.java index ed6020fe304..5bb50523d2d 100644 --- a/java/src/IceInternal/EventHandler.java +++ b/java/src/IceInternal/EventHandler.java @@ -9,7 +9,7 @@ package IceInternal; -public abstract class EventHandler +public abstract class EventHandler extends SelectorHandler { // // Return true if the handler is for a datagram transport, false otherwise. @@ -28,14 +28,6 @@ public abstract class EventHandler abstract public boolean read(BasicStream is); // - // In Java, it's possible that the transceiver reads more data - // than what was really asked. If this is the case, hasMoreData() - // returns true and the handler read() method should be called - // again (without doing a select()). - // - abstract public boolean hasMoreData(); - - // // A complete message has been received. // abstract public void message(BasicStream stream, ThreadPool threadPool); @@ -77,4 +69,6 @@ public abstract class EventHandler // connection for validation. // protected BasicStream _stream; + boolean _serializing; + boolean _registered; } diff --git a/java/src/IceInternal/FixedReference.java b/java/src/IceInternal/FixedReference.java index 376b42ed1e2..afa5744cae9 100644 --- a/java/src/IceInternal/FixedReference.java +++ b/java/src/IceInternal/FixedReference.java @@ -73,12 +73,6 @@ public class FixedReference extends Reference return Ice.EndpointSelectionType.Random; } - public boolean - getThreadPerConnection() - { - return false; - } - public int getLocatorCacheTimeout() { @@ -133,12 +127,6 @@ public class FixedReference extends Reference throw new Ice.FixedProxyException(); } - public final Reference - changeThreadPerConnection(boolean newTpc) - { - throw new Ice.FixedProxyException(); - } - public Reference changeLocatorCacheTimeout(int newTimeout) { diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 4efcaf79318..d231b2eea04 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -72,7 +72,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice public void waitUntilFinished() { - Thread threadPerIncomingConnectionFactory = null; java.util.LinkedList<Ice.ConnectionI> connections = null; synchronized(this) @@ -92,9 +91,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } } - threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory; - _threadPerIncomingConnectionFactory = null; - // // Clear the OA. See bug 1673 for the details of why this is necessary. // @@ -110,21 +106,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } } - if(threadPerIncomingConnectionFactory != null) - { - while(true) - { - try - { - threadPerIncomingConnectionFactory.join(); - break; - } - catch(InterruptedException ex) - { - } - } - } - if(connections != null) { java.util.ListIterator<Ice.ConnectionI> p = connections.listIterator(); @@ -193,35 +174,42 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } // + // Operations from SelectorHandler. + // + + public java.nio.channels.SelectableChannel + fd() + { + assert(_acceptor != null); + return _acceptor.fd(); + } + + public boolean + hasMoreData() + { + assert(_acceptor != null); + return false; + } + + // // Operations from EventHandler. // public boolean datagram() { - assert(!_threadPerConnection); // Only for use with a thread pool. return _endpoint.datagram(); } public boolean readable() { - assert(!_threadPerConnection); // Only for use with a thread pool. return false; } public boolean read(BasicStream unused) { - assert(!_threadPerConnection); // Only for use with a thread pool. - assert(false); // Must not be called. - return false; - } - - public boolean - hasMoreData() - { - assert(!_threadPerConnection); // Only for use with a thread pool. assert(false); // Must not be called. return false; } @@ -229,8 +217,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice public void message(BasicStream unused, ThreadPool threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. - Ice.ConnectionI connection = null; try @@ -262,7 +248,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice Transceiver transceiver = null; try { - transceiver = _acceptor.accept(0); + transceiver = _acceptor.accept(); } catch(Ice.SocketException ex) { @@ -288,8 +274,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice try { - assert(!_threadPerConnection); - connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter, false); + connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter); } catch(Ice.LocalException ex) { @@ -318,7 +303,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice // This makes sure that we promote a follower before we leave the scope of the mutex // above, but after we call accept() (if we call it). // - threadPool.promoteFollower(); + threadPool.promoteFollower(null); } connection.start(this); @@ -327,19 +312,12 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice public synchronized void finished(ThreadPool threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. - - threadPool.promoteFollower(); - assert(threadPool == ((Ice.ObjectAdapterI)_adapter).getThreadPool()); + threadPool.promoteFollower(null); + assert(threadPool == ((Ice.ObjectAdapterI)_adapter).getThreadPool() && _state == StateClosed); - --_finishedCount; - - if(_finishedCount == 0 && _state == StateClosed) - { - _acceptor.close(); - _acceptor = null; - notifyAll(); - } + _acceptor.close(); + _acceptor = null; + notifyAll(); } public void @@ -406,8 +384,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice super(instance); _endpoint = endpoint; _adapter = adapter; - _registeredWithPool = false; - _finishedCount = 0; _warn = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; _state = StateHolding; @@ -423,8 +399,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } Ice.ObjectAdapterI adapterImpl = (Ice.ObjectAdapterI)_adapter; - _threadPerConnection = adapterImpl.getThreadPerConnection(); - try { EndpointIHolder h = new EndpointIHolder(); @@ -438,8 +412,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice Ice.ConnectionI connection; try { - connection = new Ice.ConnectionI(_instance, _transceiver, _endpoint, _adapter, - _threadPerConnection); + connection = new Ice.ConnectionI(_instance, _transceiver, _endpoint, _adapter); } catch(Ice.LocalException ex) { @@ -464,25 +437,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice _endpoint = h.value; assert(_acceptor != null); _acceptor.listen(); - - if(_threadPerConnection) - { - // - // If we are in thread per connection mode, we also use - // one thread per incoming connection factory, that - // accepts new connections on this endpoint. - // - try - { - _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(); - _threadPerIncomingConnectionFactory.start(); - } - catch(java.lang.Exception ex) - { - error("cannot create thread for incoming connection factory", ex); - throw ex; - } - } } } catch(java.lang.Exception ex) @@ -508,7 +462,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice _state = StateClosed; _acceptor = null; _connections = null; - _threadPerIncomingConnectionFactory = null; } if(ex instanceof Ice.LocalException) @@ -531,7 +484,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice IceUtilInternal.Assert.FinalizerAssert(_state == StateClosed); IceUtilInternal.Assert.FinalizerAssert(_acceptor == null); IceUtilInternal.Assert.FinalizerAssert(_connections == null); - IceUtilInternal.Assert.FinalizerAssert(_threadPerIncomingConnectionFactory == null); super.finalize(); } @@ -556,9 +508,9 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice { return; } - if(!_threadPerConnection && _acceptor != null) + if(_acceptor != null) { - registerWithPool(); + ((Ice.ObjectAdapterI)_adapter).getThreadPool()._register(this); } java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator(); @@ -576,9 +528,9 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice { return; } - if(!_threadPerConnection && _acceptor != null) + if(_acceptor != null) { - unregisterWithPool(); + ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(this); } java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator(); @@ -594,25 +546,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice { if(_acceptor != null) { - if(_threadPerConnection) - { - // - // If we are in thread per connection mode, we connect - // to our own acceptor, which unblocks our thread per - // incoming connection factory stuck in accept(). - // - _acceptor.connectToSelf(); - } - else - { - // - // Otherwise we first must make sure that we are - // registered, then we unregister, and let finished() - // do the close. - // - registerWithPool(); - unregisterWithPool(); - } + ((Ice.ObjectAdapterI)_adapter).getThreadPool().finish(this); } java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator(); @@ -630,33 +564,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice } private void - registerWithPool() - { - assert(!_threadPerConnection); // Only for use with a thread pool. - assert(_acceptor != null); - - if(!_registeredWithPool) - { - ((Ice.ObjectAdapterI)_adapter).getThreadPool()._register(_acceptor.fd(), this); - _registeredWithPool = true; - } - } - - private void - unregisterWithPool() - { - assert(!_threadPerConnection); // Only for use with a thread pool. - assert(_acceptor != null); - - if(_registeredWithPool) - { - ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(_acceptor.fd()); - _registeredWithPool = false; - ++_finishedCount; // For each unregistration, finished() is called once. - } - } - - private void warning(Ice.LocalException ex) { java.io.StringWriter sw = new java.io.StringWriter(); @@ -678,168 +585,15 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice _instance.initializationData().logger.error(s); } - private void - run() - { - assert(_acceptor != null); - - while(true) - { - // - // We must accept new connections outside the thread - // synchronization, because we use blocking accept. - // - Transceiver transceiver = null; - try - { - transceiver = _acceptor.accept(-1); - } - catch(Ice.SocketException ex) - { - // Ignore socket exceptions. - } - catch(Ice.TimeoutException ex) - { - // Ignore timeouts. - } - catch(Ice.LocalException ex) - { - // Warn about other Ice local exceptions. - if(_warn) - { - warning(ex); - } - } - - Ice.ConnectionI connection = null; - synchronized(this) - { - while(_state == StateHolding) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - if(_state == StateClosed) - { - if(transceiver != null) - { - try - { - transceiver.close(); - } - catch(Ice.LocalException ex) - { - // Here we ignore any exceptions in close(). - } - } - - try - { - _acceptor.close(); - } - catch(Ice.LocalException ex) - { - _acceptor = null; - notifyAll(); - throw ex; - } - - _acceptor = null; - notifyAll(); - return; - } - - assert(_state == StateActive); - - // - // Reap connections for which destruction has completed. - // - java.util.ListIterator<Ice.ConnectionI> p = _connections.listIterator(); - while(p.hasNext()) - { - Ice.ConnectionI con = p.next(); - if(con.isFinished()) - { - p.remove(); - } - } - - if(transceiver == null) - { - continue; - } - - try - { - connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter, _threadPerConnection); - } - catch(Ice.LocalException ex) - { - try - { - transceiver.close(); - } - catch(Ice.LocalException exc) - { - // Ignore - } - - if(_warn) - { - warning(ex); - } - continue; - } - _connections.add(connection); - } - - // - // In thread-per-connection mode and regardless of the background mode, - // start() doesn't block. The connection thread is started and takes - // care of the connection validation and notifies the factory through - // the callback when it's done. - // - connection.start(this); - } - } - - private class ThreadPerIncomingConnectionFactory extends Thread - { - public void - run() - { - try - { - IncomingConnectionFactory.this.run(); - } - catch(Exception ex) - { - IncomingConnectionFactory.this.error("exception in thread per incoming connection factory", ex); - } - } - } - private Thread _threadPerIncomingConnectionFactory; - private Acceptor _acceptor; private final Transceiver _transceiver; private EndpointI _endpoint; private Ice.ObjectAdapter _adapter; - private boolean _registeredWithPool; - private int _finishedCount; - private final boolean _warn; private java.util.List<Ice.ConnectionI> _connections = new java.util.LinkedList<Ice.ConnectionI>(); private int _state; - - private final boolean _threadPerConnection; } diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index b14f46db598..2a47cba9112 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -217,24 +217,6 @@ public final class Instance return _timer; } - public boolean - threadPerConnection() - { - return _threadPerConnection; - } - - public int - threadPerConnectionStackSize() - { - return _threadPerConnectionStackSize; - } - - public boolean - background() - { - return _background; - } - public synchronized EndpointFactoryManager endpointFactoryManager() { @@ -680,19 +662,6 @@ public final class Instance _implicitContext = Ice.ImplicitContextI.create(_initData.properties.getProperty("Ice.ImplicitContext")); - _threadPerConnection = _initData.properties.getPropertyAsInt("Ice.ThreadPerConnection") > 0; - - { - int stackSize = _initData.properties.getPropertyAsInt("Ice.ThreadPerConnection.StackSize"); - if(stackSize < 0) - { - stackSize = 0; - } - _threadPerConnectionStackSize = stackSize; - } - - _background = _initData.properties.getPropertyAsInt("Ice.Background") > 0; - _routerManager = new RouterManager(); _locatorManager = new LocatorManager(); @@ -1082,9 +1051,6 @@ public final class Instance private SelectorThread _selectorThread; private EndpointHostResolver _endpointHostResolver; private Timer _timer; - private final boolean _threadPerConnection; - private final int _threadPerConnectionStackSize; - private final boolean _background; private EndpointFactoryManager _endpointFactoryManager; private Ice.PluginManager _pluginManager; private java.util.Map<String, String> _defaultContext; diff --git a/java/src/IceInternal/Network.java b/java/src/IceInternal/Network.java index 6f205bcda0e..1bdd30b54e4 100644 --- a/java/src/IceInternal/Network.java +++ b/java/src/IceInternal/Network.java @@ -280,27 +280,13 @@ public final class Network } public static boolean - doConnect(java.nio.channels.SocketChannel fd, java.net.InetSocketAddress addr, int timeout) + doConnect(java.nio.channels.SocketChannel fd, java.net.InetSocketAddress addr) { try { if(!fd.connect(addr)) { - if(timeout == 0) - { - return false; - } - - try - { - doFinishConnect(fd, timeout); - } - catch(Ice.LocalException ex) - { - closeSocketNoThrow(fd); - throw ex; - } - return true; + return false; } } catch(java.net.ConnectException ex) @@ -336,75 +322,13 @@ public final class Network } public static void - doFinishConnect(java.nio.channels.SocketChannel fd, int timeout) + doFinishConnect(java.nio.channels.SocketChannel fd) { // // Note: we don't close the socket if there's an exception. It's the responsibility // of the caller to do so. // - if(timeout != 0) - { - try - { - java.nio.channels.Selector selector = java.nio.channels.Selector.open(); - try - { - while(true) - { - try - { - java.nio.channels.SelectionKey key = - fd.register(selector, java.nio.channels.SelectionKey.OP_CONNECT); - int n; - if(timeout > 0) - { - n = selector.select(timeout); - } - else - { - n = selector.select(); - } - - if(n == 0) - { - throw new Ice.ConnectTimeoutException(); - } - - break; - } - catch(java.io.IOException ex) - { - if(interrupted(ex)) - { - continue; - } - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - } - finally - { - try - { - selector.close(); - } - catch(java.io.IOException ex) - { - // Ignore - } - } - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - try { if(!fd.finishConnect()) @@ -446,7 +370,7 @@ public final class Network } public static void - doConnect(java.nio.channels.DatagramChannel fd, java.net.InetSocketAddress addr, int timeout) + doConnect(java.nio.channels.DatagramChannel fd, java.net.InetSocketAddress addr) { try { @@ -949,58 +873,18 @@ public final class Network createPipe() { SocketPair fds = new SocketPair(); - - // - // BUGFIX: This method should really be very simple. - // Unfortunately, using a pipe causes a kernel crash under - // MacOS 10.3.9. - // - //try - //{ - // java.nio.channels.Pipe pipe = java.nio.channels.Pipe.open(); - // fds.sink = pipe.sink(); - // fds.source = pipe.source(); - //} - //catch(java.io.IOException ex) - //{ - // Ice.SocketException se = new Ice.SocketException(); - // se.initCause(ex); - // throw se; - //} - // - - java.nio.channels.ServerSocketChannel fd = createTcpServerSocket(); - - java.net.InetSocketAddress addr = new java.net.InetSocketAddress("127.0.0.1", 0); - - addr = doBind(fd, addr, 0); - try { - java.nio.channels.SocketChannel sink = createTcpSocket(); - fds.sink = sink; - doConnect(sink, addr, -1); - try - { - fds.source = doAccept(fd, -1); - } - catch(Ice.LocalException ex) - { - try - { - fds.sink.close(); - } - catch(java.io.IOException e) - { - } - throw ex; - } + java.nio.channels.Pipe pipe = java.nio.channels.Pipe.open(); + fds.sink = pipe.sink(); + fds.source = pipe.source(); } - finally + catch(java.io.IOException ex) { - closeSocketNoThrow(fd); + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; } - return fds; } diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index a454993f852..1a72c7d8aa4 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -258,9 +258,9 @@ public final class Outgoing implements OutgoingMessageCallback } public void - sent(boolean notify) + sent(boolean async) { - if(notify) + if(async) { synchronized(this) { diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index ee90ed1d2a5..9359fe4e36f 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -280,7 +280,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback __send(); } - public final void + public final boolean __send() { try @@ -288,8 +288,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback _sent = false; _response = false; _delegate = _proxy.__getDelegate(true); - _delegate.__getRequestHandler().sendAsyncRequest(this); - return; + _sentSynchronously = _delegate.__getRequestHandler().sendAsyncRequest(this); } catch(LocalExceptionWrapper ex) { @@ -299,6 +298,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback { handleException(ex); } + return _sentSynchronously; } protected final void @@ -470,6 +470,7 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback } private boolean _sent; + private boolean _sentSynchronously; private boolean _response; private Ice.ObjectPrxHelperBase _proxy; private Ice._ObjectDel _delegate; diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java index 78b5ddb17c9..a0d21e7ad90 100644 --- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java +++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java @@ -22,6 +22,19 @@ abstract public class OutgoingAsyncMessageCallback } public void + __sent(Instance instance) + { + try + { + ((Ice.AMISentCallback)this).ice_sent(); + } + catch(java.lang.Exception ex) + { + __warning(instance, ex); + } + } + + public void __exception(Ice.LocalException exc) { try @@ -94,7 +107,7 @@ abstract public class OutgoingAsyncMessageCallback public void execute(ThreadPool threadPool) { - threadPool.promoteFollower(); + threadPool.promoteFollower(null); __exception(ex); } }); @@ -125,20 +138,25 @@ abstract public class OutgoingAsyncMessageCallback protected void __warning(java.lang.Exception ex) { - if(__os != null) // Don't print anything if release() was already called. + if(__os != null) { - Instance instance = __os.instance(); - if(instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - IceUtilInternal.OutputBase out = new IceUtilInternal.OutputBase(pw); - out.setUseTab(false); - out.print("exception raised by AMI callback:\n"); - ex.printStackTrace(pw); - pw.flush(); - instance.initializationData().logger.warning(sw.toString()); - } + __warning(__os.instance(), ex); + } + } + + protected void + __warning(Instance instance, java.lang.Exception ex) + { + if(instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + IceUtilInternal.OutputBase out = new IceUtilInternal.OutputBase(pw); + out.setUseTab(false); + out.print("exception raised by AMI callback:\n"); + ex.printStackTrace(pw); + pw.flush(); + instance.initializationData().logger.warning(sw.toString()); } } diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index fec7e825ab0..58177812f6d 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -106,8 +106,7 @@ public final class OutgoingConnectionFactory } public Ice.ConnectionI - create(EndpointI[] endpts, boolean hasMore, boolean tpc, Ice.EndpointSelectionType selType, - Ice.BooleanHolder compress) + create(EndpointI[] endpts, boolean hasMore, Ice.EndpointSelectionType selType, Ice.BooleanHolder compress) { assert(endpts.length > 0); @@ -119,7 +118,7 @@ public final class OutgoingConnectionFactory // // Try to find a connection to one of the given endpoints. // - Ice.ConnectionI connection = findConnection(endpoints, tpc, compress); + Ice.ConnectionI connection = findConnectionByEndpoint(endpoints, compress); if(connection != null) { return connection; @@ -156,7 +155,7 @@ public final class OutgoingConnectionFactory java.util.Iterator<Connector> q = cons.iterator(); while(q.hasNext()) { - connectors.add(new ConnectorInfo(q.next(), endpoint, tpc)); + connectors.add(new ConnectorInfo(q.next(), endpoint)); } } catch(Ice.LocalException ex) @@ -194,21 +193,7 @@ public final class OutgoingConnectionFactory ConnectorInfo ci = q.next(); try { - int timeout; - if(defaultsAndOverrides.overrideConnectTimeout) - { - timeout = defaultsAndOverrides.overrideConnectTimeoutValue; - } - else - { - // - // It is not necessary to check for overrideTimeout, the endpoint has already - // been modified with this override, if set. - // - timeout = ci.endpoint.timeout(); - } - - connection = createConnection(ci.connector.connect(timeout), ci); + connection = createConnection(ci.connector.connect(), ci); connection.start(null); if(defaultsAndOverrides.overrideCompress) @@ -253,7 +238,7 @@ public final class OutgoingConnectionFactory } public void - create(EndpointI[] endpts, boolean hasMore, boolean tpc, Ice.EndpointSelectionType selType, + create(EndpointI[] endpts, boolean hasMore, Ice.EndpointSelectionType selType, CreateConnectionCallback callback) { assert(endpts.length > 0); @@ -269,7 +254,7 @@ public final class OutgoingConnectionFactory try { Ice.BooleanHolder compress = new Ice.BooleanHolder(); - Ice.ConnectionI connection = findConnection(endpoints, tpc, compress); + Ice.ConnectionI connection = findConnectionByEndpoint(endpoints, compress); if(connection != null) { callback.setConnection(connection, compress.value); @@ -282,7 +267,7 @@ public final class OutgoingConnectionFactory return; } - ConnectCallback cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc); + ConnectCallback cb = new ConnectCallback(this, endpoints, hasMore, callback, selType); cb.getConnectors(); } @@ -469,7 +454,7 @@ public final class OutgoingConnectionFactory } synchronized private Ice.ConnectionI - findConnection(java.util.List<EndpointI> endpoints, boolean tpc, Ice.BooleanHolder compress) + findConnectionByEndpoint(java.util.List<EndpointI> endpoints, Ice.BooleanHolder compress) { if(_destroyed) { @@ -493,8 +478,7 @@ public final class OutgoingConnectionFactory while(q.hasNext()) { Ice.ConnectionI connection = q.next(); - if(connection.isActiveOrHolding() && - connection.threadPerConnection() == tpc) // Don't return destroyed or un-validated connections + if(connection.isActiveOrHolding()) // Don't return destroyed or un-validated connections { if(defaultsAndOverrides.overrideCompress) { @@ -787,8 +771,7 @@ public final class OutgoingConnectionFactory throw new Ice.CommunicatorDestroyedException(); } - Ice.ConnectionI connection = new Ice.ConnectionI(_instance, transceiver, ci.endpoint.compress(false), - null, ci.threadPerConnection); + Ice.ConnectionI connection = new Ice.ConnectionI(_instance, transceiver, ci.endpoint.compress(false),null); java.util.List<Ice.ConnectionI> connectionList = _connections.get(ci); if(connectionList == null) @@ -897,10 +880,7 @@ public final class OutgoingConnectionFactory // If the connection is finished, we remove it right away instead of // waiting for the reaping. // - // NOTE: it's possible for the connection to not be finished yet. That's - // for instance the case when using thread per connection and if it's the - // thread which is calling back the outgoing connection factory to notify - // it of the failure. + // NOTE: it's possible for the connection to not be finished yet. // synchronized(this) { @@ -957,46 +937,39 @@ public final class OutgoingConnectionFactory private static class ConnectorInfo { - public ConnectorInfo(Connector c, EndpointI e, boolean t) + public ConnectorInfo(Connector c, EndpointI e) { connector = c; endpoint = e; - threadPerConnection = t; } public boolean equals(Object obj) { ConnectorInfo r = (ConnectorInfo)obj; - if(threadPerConnection != r.threadPerConnection) - { - return false; - } return connector.equals(r.connector); } public int hashCode() { - return 2 * connector.hashCode() + (threadPerConnection ? 0 : 1); + return connector.hashCode(); } public Connector connector; public EndpointI endpoint; - public boolean threadPerConnection; } private static class ConnectCallback implements Ice.ConnectionI.StartCallback, EndpointI_connectors { ConnectCallback(OutgoingConnectionFactory f, java.util.List<EndpointI> endpoints, boolean more, - CreateConnectionCallback cb, Ice.EndpointSelectionType selType, boolean threadPerConnection) + CreateConnectionCallback cb, Ice.EndpointSelectionType selType) { _factory = f; _endpoints = endpoints; _hasMore = more; _callback = cb; _selType = selType; - _threadPerConnection = threadPerConnection; _endpointsIter = _endpoints.iterator(); } @@ -1063,7 +1036,7 @@ public final class OutgoingConnectionFactory java.util.Iterator<Connector> q = cons.iterator(); while(q.hasNext()) { - _connectors.add(new ConnectorInfo(q.next(), _currentEndpoint, _threadPerConnection)); + _connectors.add(new ConnectorInfo(q.next(), _currentEndpoint)); } if(_endpointsIter.hasNext()) @@ -1183,7 +1156,7 @@ public final class OutgoingConnectionFactory { assert(_iter.hasNext()); _current = _iter.next(); - connection = _factory.createConnection(_current.connector.connect(0), _current); + connection = _factory.createConnection(_current.connector.connect(), _current); connection.start(this); } catch(Ice.LocalException ex) @@ -1197,7 +1170,6 @@ public final class OutgoingConnectionFactory private final CreateConnectionCallback _callback; private final java.util.List<EndpointI> _endpoints; private final Ice.EndpointSelectionType _selType; - private final boolean _threadPerConnection; private java.util.Iterator<EndpointI> _endpointsIter; private EndpointI _currentEndpoint; private java.util.List<ConnectorInfo> _connectors = new java.util.ArrayList<ConnectorInfo>(); diff --git a/java/src/IceInternal/PropertyNames.java b/java/src/IceInternal/PropertyNames.java index ef861c56542..2d9dafb23f5 100644 --- a/java/src/IceInternal/PropertyNames.java +++ b/java/src/IceInternal/PropertyNames.java @@ -7,7 +7,7 @@ // // ********************************************************************** // -// Generated by makeprops.py from file ../config/PropertyNames.xml, Wed Feb 27 12:48:24 2008 +// Generated by makeprops.py from file ../config/PropertyNames.xml, Mon Mar 3 22:29:56 2008 // IMPORTANT: Do not edit this file -- any edits made here will be lost! @@ -27,12 +27,11 @@ public final class PropertyNames new Property("Ice\\.Admin\\.ReplicaGroupId", false, null), new Property("Ice\\.Admin\\.Router", false, null), new Property("Ice\\.Admin\\.ProxyOptions", false, null), - new Property("Ice\\.Admin\\.ThreadPerConnection", false, null), - new Property("Ice\\.Admin\\.ThreadPerConnection\\.StackSize", false, null), new Property("Ice\\.Admin\\.ThreadPool\\.Size", false, null), new Property("Ice\\.Admin\\.ThreadPool\\.SizeMax", false, null), new Property("Ice\\.Admin\\.ThreadPool\\.SizeWarn", false, null), new Property("Ice\\.Admin\\.ThreadPool\\.StackSize", false, null), + new Property("Ice\\.Admin\\.ThreadPool\\.Serialize", false, null), new Property("Ice\\.Admin\\.DelayCreation", false, null), new Property("Ice\\.Admin\\.Facets", false, null), new Property("Ice\\.Admin\\.InstanceName", false, null), @@ -53,7 +52,6 @@ public final class PropertyNames new Property("Ice\\.Default\\.Locator\\.Router", false, null), new Property("Ice\\.Default\\.Locator\\.CollocationOptimization", true, "Ice.Default.Locator.CollocationOptimized"), new Property("Ice\\.Default\\.Locator\\.CollocationOptimized", false, null), - new Property("Ice\\.Default\\.Locator\\.ThreadPerConnection", false, null), new Property("Ice\\.Default\\.Locator", false, null), new Property("Ice\\.Default\\.LocatorCacheTimeout", false, null), new Property("Ice\\.Default\\.Package", false, null), @@ -67,7 +65,6 @@ public final class PropertyNames new Property("Ice\\.Default\\.Router\\.Router", false, null), new Property("Ice\\.Default\\.Router\\.CollocationOptimization", true, "Ice.Default.Router.CollocationOptimized"), new Property("Ice\\.Default\\.Router\\.CollocationOptimized", false, null), - new Property("Ice\\.Default\\.Router\\.ThreadPerConnection", false, null), new Property("Ice\\.Default\\.Router", false, null), new Property("Ice\\.IPv4", false, null), new Property("Ice\\.IPv6", false, null), @@ -95,16 +92,16 @@ public final class PropertyNames new Property("Ice\\.ServerIdleTime", false, null), new Property("Ice\\.StdErr", false, null), new Property("Ice\\.StdOut", false, null), - new Property("Ice\\.ThreadPerConnection", false, null), - new Property("Ice\\.ThreadPerConnection\\.StackSize", false, null), new Property("Ice\\.ThreadPool\\.Client\\.Size", false, null), new Property("Ice\\.ThreadPool\\.Client\\.SizeMax", false, null), new Property("Ice\\.ThreadPool\\.Client\\.SizeWarn", false, null), new Property("Ice\\.ThreadPool\\.Client\\.StackSize", false, null), + new Property("Ice\\.ThreadPool\\.Client\\.Serialize", false, null), new Property("Ice\\.ThreadPool\\.Server\\.Size", false, null), new Property("Ice\\.ThreadPool\\.Server\\.SizeMax", false, null), new Property("Ice\\.ThreadPool\\.Server\\.SizeWarn", false, null), new Property("Ice\\.ThreadPool\\.Server\\.StackSize", false, null), + new Property("Ice\\.ThreadPool\\.Server\\.Serialize", false, null), new Property("Ice\\.Trace\\.GC", false, null), new Property("Ice\\.Trace\\.Location", true, "Ice.Trace.Locator"), new Property("Ice\\.Trace\\.Locator", false, null), @@ -144,12 +141,11 @@ public final class PropertyNames new Property("IceBox\\.ServiceManager\\.ReplicaGroupId", false, null), new Property("IceBox\\.ServiceManager\\.Router", false, null), new Property("IceBox\\.ServiceManager\\.ProxyOptions", false, null), - new Property("IceBox\\.ServiceManager\\.ThreadPerConnection", false, null), - new Property("IceBox\\.ServiceManager\\.ThreadPerConnection\\.StackSize", false, null), new Property("IceBox\\.ServiceManager\\.ThreadPool\\.Size", false, null), new Property("IceBox\\.ServiceManager\\.ThreadPool\\.SizeMax", false, null), new Property("IceBox\\.ServiceManager\\.ThreadPool\\.SizeWarn", false, null), new Property("IceBox\\.ServiceManager\\.ThreadPool\\.StackSize", false, null), + new Property("IceBox\\.ServiceManager\\.ThreadPool\\.Serialize", false, null), new Property("IceBox\\.Trace\\.ServiceObserver", false, null), new Property("IceBox\\.UseSharedCommunicator\\.[^\\s]+", false, null), null @@ -165,7 +161,6 @@ public final class PropertyNames new Property("IceBoxAdmin\\.ServiceManager\\.Proxy\\.Router", false, null), new Property("IceBoxAdmin\\.ServiceManager\\.Proxy\\.CollocationOptimization", true, "IceBoxAdmin.ServiceManager.Proxy.CollocationOptimized"), new Property("IceBoxAdmin\\.ServiceManager\\.Proxy\\.CollocationOptimized", false, null), - new Property("IceBoxAdmin\\.ServiceManager\\.Proxy\\.ThreadPerConnection", false, null), new Property("IceBoxAdmin\\.ServiceManager\\.Proxy", false, null), null }; @@ -192,12 +187,11 @@ public final class PropertyNames new Property("IceGrid\\.Node\\.ReplicaGroupId", false, null), new Property("IceGrid\\.Node\\.Router", false, null), new Property("IceGrid\\.Node\\.ProxyOptions", false, null), - new Property("IceGrid\\.Node\\.ThreadPerConnection", false, null), - new Property("IceGrid\\.Node\\.ThreadPerConnection\\.StackSize", false, null), new Property("IceGrid\\.Node\\.ThreadPool\\.Size", false, null), new Property("IceGrid\\.Node\\.ThreadPool\\.SizeMax", false, null), new Property("IceGrid\\.Node\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Node\\.ThreadPool\\.StackSize", false, null), + new Property("IceGrid\\.Node\\.ThreadPool\\.Serialize", false, null), new Property("IceGrid\\.Node\\.AllowRunningServersAsRoot", false, null), new Property("IceGrid\\.Node\\.AllowEndpointsOverride", false, null), new Property("IceGrid\\.Node\\.CollocateRegistry", false, null), @@ -222,7 +216,6 @@ public final class PropertyNames new Property("IceGrid\\.Node\\.UserAccountMapper\\.Router", false, null), new Property("IceGrid\\.Node\\.UserAccountMapper\\.CollocationOptimization", true, "IceGrid.Node.UserAccountMapper.CollocationOptimized"), new Property("IceGrid\\.Node\\.UserAccountMapper\\.CollocationOptimized", false, null), - new Property("IceGrid\\.Node\\.UserAccountMapper\\.ThreadPerConnection", false, null), new Property("IceGrid\\.Node\\.UserAccountMapper", false, null), new Property("IceGrid\\.Node\\.WaitTime", false, null), new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.AdapterId", false, null), @@ -233,12 +226,11 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ReplicaGroupId", false, null), new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.Router", false, null), new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ProxyOptions", false, null), - new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPerConnection", false, null), - new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPerConnection\\.StackSize", false, null), new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPool\\.Size", false, null), new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPool\\.SizeMax", false, null), new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPool\\.StackSize", false, null), + new Property("IceGrid\\.Registry\\.AdminCallbackRouter\\.ThreadPool\\.Serialize", false, null), new Property("IceGrid\\.Registry\\.AdminCryptPasswords", false, null), new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.EndpointSelection", false, null), new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.ConnectionCached", false, null), @@ -248,7 +240,6 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.Router", false, null), new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.CollocationOptimization", true, "IceGrid.Registry.AdminPermissionsVerifier.CollocationOptimized"), new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.CollocationOptimized", false, null), - new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier\\.ThreadPerConnection", false, null), new Property("IceGrid\\.Registry\\.AdminPermissionsVerifier", false, null), new Property("IceGrid\\.Registry\\.AdminSessionFilters", false, null), new Property("IceGrid\\.Registry\\.AdminSessionManager\\.AdapterId", false, null), @@ -259,12 +250,11 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ReplicaGroupId", false, null), new Property("IceGrid\\.Registry\\.AdminSessionManager\\.Router", false, null), new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ProxyOptions", false, null), - new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPerConnection", false, null), - new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPerConnection\\.StackSize", false, null), new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.Size", false, null), new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.SizeMax", false, null), new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.StackSize", false, null), + new Property("IceGrid\\.Registry\\.AdminSessionManager\\.ThreadPool\\.Serialize", false, null), new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.EndpointSelection", false, null), new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.ConnectionCached", false, null), new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.PreferSecure", false, null), @@ -273,7 +263,6 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.Router", false, null), new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.CollocationOptimization", true, "IceGrid.Registry.AdminSSLPermissionsVerifier.CollocationOptimized"), new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.CollocationOptimized", false, null), - new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier\\.ThreadPerConnection", false, null), new Property("IceGrid\\.Registry\\.AdminSSLPermissionsVerifier", false, null), new Property("IceGrid\\.Registry\\.Client\\.AdapterId", false, null), new Property("IceGrid\\.Registry\\.Client\\.Endpoints", false, null), @@ -283,12 +272,11 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.Client\\.ReplicaGroupId", false, null), new Property("IceGrid\\.Registry\\.Client\\.Router", false, null), new Property("IceGrid\\.Registry\\.Client\\.ProxyOptions", false, null), - new Property("IceGrid\\.Registry\\.Client\\.ThreadPerConnection", false, null), - new Property("IceGrid\\.Registry\\.Client\\.ThreadPerConnection\\.StackSize", false, null), new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.Size", false, null), new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.SizeMax", false, null), new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.StackSize", false, null), + new Property("IceGrid\\.Registry\\.Client\\.ThreadPool\\.Serialize", false, null), new Property("IceGrid\\.Registry\\.CryptPasswords", false, null), new Property("IceGrid\\.Registry\\.Data", false, null), new Property("IceGrid\\.Registry\\.DefaultTemplates", false, null), @@ -301,12 +289,11 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.Internal\\.ReplicaGroupId", false, null), new Property("IceGrid\\.Registry\\.Internal\\.Router", false, null), new Property("IceGrid\\.Registry\\.Internal\\.ProxyOptions", false, null), - new Property("IceGrid\\.Registry\\.Internal\\.ThreadPerConnection", false, null), - new Property("IceGrid\\.Registry\\.Internal\\.ThreadPerConnection\\.StackSize", false, null), new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.Size", false, null), new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.SizeMax", false, null), new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.StackSize", false, null), + new Property("IceGrid\\.Registry\\.Internal\\.ThreadPool\\.Serialize", false, null), new Property("IceGrid\\.Registry\\.NodeSessionTimeout", false, null), new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.EndpointSelection", false, null), new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.ConnectionCached", false, null), @@ -316,7 +303,6 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.Router", false, null), new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.CollocationOptimization", true, "IceGrid.Registry.PermissionsVerifier.CollocationOptimized"), new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.CollocationOptimized", false, null), - new Property("IceGrid\\.Registry\\.PermissionsVerifier\\.ThreadPerConnection", false, null), new Property("IceGrid\\.Registry\\.PermissionsVerifier", false, null), new Property("IceGrid\\.Registry\\.ReplicaName", false, null), new Property("IceGrid\\.Registry\\.ReplicaSessionTimeout", false, null), @@ -328,12 +314,11 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.Server\\.ReplicaGroupId", false, null), new Property("IceGrid\\.Registry\\.Server\\.Router", false, null), new Property("IceGrid\\.Registry\\.Server\\.ProxyOptions", false, null), - new Property("IceGrid\\.Registry\\.Server\\.ThreadPerConnection", false, null), - new Property("IceGrid\\.Registry\\.Server\\.ThreadPerConnection\\.StackSize", false, null), new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.Size", false, null), new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.SizeMax", false, null), new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.StackSize", false, null), + new Property("IceGrid\\.Registry\\.Server\\.ThreadPool\\.Serialize", false, null), new Property("IceGrid\\.Registry\\.SessionFilters", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.AdapterId", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.Endpoints", false, null), @@ -343,12 +328,11 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.SessionManager\\.ReplicaGroupId", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.Router", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.ProxyOptions", false, null), - new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPerConnection", false, null), - new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPerConnection\\.StackSize", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.Size", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.SizeMax", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.SizeWarn", false, null), new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.StackSize", false, null), + new Property("IceGrid\\.Registry\\.SessionManager\\.ThreadPool\\.Serialize", false, null), new Property("IceGrid\\.Registry\\.SessionTimeout", false, null), new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.EndpointSelection", false, null), new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.ConnectionCached", false, null), @@ -358,7 +342,6 @@ public final class PropertyNames new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.Router", false, null), new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.CollocationOptimization", true, "IceGrid.Registry.SSLPermissionsVerifier.CollocationOptimized"), new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.CollocationOptimized", false, null), - new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier\\.ThreadPerConnection", false, null), new Property("IceGrid\\.Registry\\.SSLPermissionsVerifier", false, null), new Property("IceGrid\\.Registry\\.Trace\\.Application", false, null), new Property("IceGrid\\.Registry\\.Trace\\.Adapter", false, null), @@ -383,12 +366,11 @@ public final class PropertyNames new Property("IcePatch2\\.ReplicaGroupId", false, null), new Property("IcePatch2\\.Router", false, null), new Property("IcePatch2\\.ProxyOptions", false, null), - new Property("IcePatch2\\.ThreadPerConnection", false, null), - new Property("IcePatch2\\.ThreadPerConnection\\.StackSize", false, null), new Property("IcePatch2\\.ThreadPool\\.Size", false, null), new Property("IcePatch2\\.ThreadPool\\.SizeMax", false, null), new Property("IcePatch2\\.ThreadPool\\.SizeWarn", false, null), new Property("IcePatch2\\.ThreadPool\\.StackSize", false, null), + new Property("IcePatch2\\.ThreadPool\\.Serialize", false, null), new Property("IcePatch2\\.Admin\\.AdapterId", true, null), new Property("IcePatch2\\.Admin\\.Endpoints", true, null), new Property("IcePatch2\\.Admin\\.Locator", true, null), @@ -396,8 +378,6 @@ public final class PropertyNames new Property("IcePatch2\\.Admin\\.RegisterProcess", true, null), new Property("IcePatch2\\.Admin\\.ReplicaGroupId", true, null), new Property("IcePatch2\\.Admin\\.Router", true, null), - new Property("IcePatch2\\.Admin\\.ThreadPerConnection", true, null), - new Property("IcePatch2\\.Admin\\.ThreadPerConnection\\.StackSize", true, null), new Property("IcePatch2\\.Admin\\.ThreadPool\\.Size", true, null), new Property("IcePatch2\\.Admin\\.ThreadPool\\.SizeMax", true, null), new Property("IcePatch2\\.Admin\\.ThreadPool\\.SizeWarn", true, null), @@ -453,62 +433,6 @@ public final class PropertyNames null }; - public static final Property IceStormProps[] = - { - new Property("IceStorm\\.Flush\\.Timeout", false, null), - new Property("IceStorm\\.InstanceName", false, null), - new Property("IceStorm\\.Publish\\.AdapterId", false, null), - new Property("IceStorm\\.Publish\\.Endpoints", false, null), - new Property("IceStorm\\.Publish\\.Locator", false, null), - new Property("IceStorm\\.Publish\\.PublishedEndpoints", false, null), - new Property("IceStorm\\.Publish\\.RegisterProcess", true, null), - new Property("IceStorm\\.Publish\\.ReplicaGroupId", false, null), - new Property("IceStorm\\.Publish\\.Router", false, null), - new Property("IceStorm\\.Publish\\.ProxyOptions", false, null), - new Property("IceStorm\\.Publish\\.ThreadPerConnection", false, null), - new Property("IceStorm\\.Publish\\.ThreadPerConnection\\.StackSize", false, null), - new Property("IceStorm\\.Publish\\.ThreadPool\\.Size", false, null), - new Property("IceStorm\\.Publish\\.ThreadPool\\.SizeMax", false, null), - new Property("IceStorm\\.Publish\\.ThreadPool\\.SizeWarn", false, null), - new Property("IceStorm\\.Publish\\.ThreadPool\\.StackSize", false, null), - new Property("IceStorm\\.TopicManager\\.AdapterId", false, null), - new Property("IceStorm\\.TopicManager\\.Endpoints", false, null), - new Property("IceStorm\\.TopicManager\\.Locator", false, null), - new Property("IceStorm\\.TopicManager\\.PublishedEndpoints", false, null), - new Property("IceStorm\\.TopicManager\\.RegisterProcess", true, null), - new Property("IceStorm\\.TopicManager\\.ReplicaGroupId", false, null), - new Property("IceStorm\\.TopicManager\\.Router", false, null), - new Property("IceStorm\\.TopicManager\\.ProxyOptions", false, null), - new Property("IceStorm\\.TopicManager\\.ThreadPerConnection", false, null), - new Property("IceStorm\\.TopicManager\\.ThreadPerConnection\\.StackSize", false, null), - new Property("IceStorm\\.TopicManager\\.ThreadPool\\.Size", false, null), - new Property("IceStorm\\.TopicManager\\.ThreadPool\\.SizeMax", false, null), - new Property("IceStorm\\.TopicManager\\.ThreadPool\\.SizeWarn", false, null), - new Property("IceStorm\\.TopicManager\\.ThreadPool\\.StackSize", false, null), - new Property("IceStorm\\.TopicManager\\.Proxy\\.EndpointSelection", false, null), - new Property("IceStorm\\.TopicManager\\.Proxy\\.ConnectionCached", false, null), - new Property("IceStorm\\.TopicManager\\.Proxy\\.PreferSecure", false, null), - new Property("IceStorm\\.TopicManager\\.Proxy\\.LocatorCacheTimeout", false, null), - new Property("IceStorm\\.TopicManager\\.Proxy\\.Locator", false, null), - new Property("IceStorm\\.TopicManager\\.Proxy\\.Router", false, null), - new Property("IceStorm\\.TopicManager\\.Proxy\\.CollocationOptimization", true, "IceStorm.TopicManager.Proxy.CollocationOptimized"), - new Property("IceStorm\\.TopicManager\\.Proxy\\.CollocationOptimized", false, null), - new Property("IceStorm\\.TopicManager\\.Proxy\\.ThreadPerConnection", false, null), - new Property("IceStorm\\.TopicManager\\.Proxy", false, null), - new Property("IceStorm\\.SubscriberPool\\.Size", false, null), - new Property("IceStorm\\.SubscriberPool\\.SizeMax", false, null), - new Property("IceStorm\\.SubscriberPool\\.SizeWarn", false, null), - new Property("IceStorm\\.SubscriberPool\\.StackSize", false, null), - new Property("IceStorm\\.Trace\\.Flush", false, null), - new Property("IceStorm\\.Trace\\.Subscriber", false, null), - new Property("IceStorm\\.Trace\\.SubscriberPool", false, null), - new Property("IceStorm\\.Trace\\.Topic", false, null), - new Property("IceStorm\\.Trace\\.TopicManager", false, null), - new Property("IceStorm\\.Send\\.Timeout", false, null), - new Property("IceStorm\\.Discard\\.Interval", false, null), - null - }; - public static final Property Glacier2Props[] = { new Property("Glacier2\\.AddSSLContext", false, null), @@ -520,8 +444,6 @@ public final class PropertyNames new Property("Glacier2\\.Admin\\.RegisterProcess", true, null), new Property("Glacier2\\.Admin\\.ReplicaGroupId", true, null), new Property("Glacier2\\.Admin\\.Router", true, null), - new Property("Glacier2\\.Admin\\.ThreadPerConnection", true, null), - new Property("Glacier2\\.Admin\\.ThreadPerConnection\\.StackSize", true, null), new Property("Glacier2\\.Admin\\.ThreadPool\\.Size", true, null), new Property("Glacier2\\.Admin\\.ThreadPool\\.SizeMax", true, null), new Property("Glacier2\\.Admin\\.ThreadPool\\.SizeWarn", true, null), @@ -535,12 +457,11 @@ public final class PropertyNames new Property("Glacier2\\.Client\\.ReplicaGroupId", false, null), new Property("Glacier2\\.Client\\.Router", false, null), new Property("Glacier2\\.Client\\.ProxyOptions", false, null), - new Property("Glacier2\\.Client\\.ThreadPerConnection", false, null), - new Property("Glacier2\\.Client\\.ThreadPerConnection\\.StackSize", false, null), new Property("Glacier2\\.Client\\.ThreadPool\\.Size", false, null), new Property("Glacier2\\.Client\\.ThreadPool\\.SizeMax", false, null), new Property("Glacier2\\.Client\\.ThreadPool\\.SizeWarn", false, null), new Property("Glacier2\\.Client\\.ThreadPool\\.StackSize", false, null), + new Property("Glacier2\\.Client\\.ThreadPool\\.Serialize", false, null), new Property("Glacier2\\.Client\\.AlwaysBatch", false, null), new Property("Glacier2\\.Client\\.Buffered", false, null), new Property("Glacier2\\.Client\\.ForwardContext", false, null), @@ -565,7 +486,6 @@ public final class PropertyNames new Property("Glacier2\\.PermissionsVerifier\\.Router", false, null), new Property("Glacier2\\.PermissionsVerifier\\.CollocationOptimization", true, "Glacier2.PermissionsVerifier.CollocationOptimized"), new Property("Glacier2\\.PermissionsVerifier\\.CollocationOptimized", false, null), - new Property("Glacier2\\.PermissionsVerifier\\.ThreadPerConnection", false, null), new Property("Glacier2\\.PermissionsVerifier", false, null), new Property("Glacier2\\.ReturnClientProxy", false, null), new Property("Glacier2\\.SSLPermissionsVerifier\\.EndpointSelection", false, null), @@ -576,7 +496,6 @@ public final class PropertyNames new Property("Glacier2\\.SSLPermissionsVerifier\\.Router", false, null), new Property("Glacier2\\.SSLPermissionsVerifier\\.CollocationOptimization", true, "Glacier2.SSLPermissionsVerifier.CollocationOptimized"), new Property("Glacier2\\.SSLPermissionsVerifier\\.CollocationOptimized", false, null), - new Property("Glacier2\\.SSLPermissionsVerifier\\.ThreadPerConnection", false, null), new Property("Glacier2\\.SSLPermissionsVerifier", false, null), new Property("Glacier2\\.RoutingTable\\.MaxSize", false, null), new Property("Glacier2\\.Server\\.AdapterId", false, null), @@ -587,12 +506,11 @@ public final class PropertyNames new Property("Glacier2\\.Server\\.ReplicaGroupId", false, null), new Property("Glacier2\\.Server\\.Router", false, null), new Property("Glacier2\\.Server\\.ProxyOptions", false, null), - new Property("Glacier2\\.Server\\.ThreadPerConnection", false, null), - new Property("Glacier2\\.Server\\.ThreadPerConnection\\.StackSize", false, null), new Property("Glacier2\\.Server\\.ThreadPool\\.Size", false, null), new Property("Glacier2\\.Server\\.ThreadPool\\.SizeMax", false, null), new Property("Glacier2\\.Server\\.ThreadPool\\.SizeWarn", false, null), new Property("Glacier2\\.Server\\.ThreadPool\\.StackSize", false, null), + new Property("Glacier2\\.Server\\.ThreadPool\\.Serialize", false, null), new Property("Glacier2\\.Server\\.AlwaysBatch", false, null), new Property("Glacier2\\.Server\\.Buffered", false, null), new Property("Glacier2\\.Server\\.ForwardContext", false, null), @@ -607,7 +525,6 @@ public final class PropertyNames new Property("Glacier2\\.SessionManager\\.Router", false, null), new Property("Glacier2\\.SessionManager\\.CollocationOptimization", true, "Glacier2.SessionManager.CollocationOptimized"), new Property("Glacier2\\.SessionManager\\.CollocationOptimized", false, null), - new Property("Glacier2\\.SessionManager\\.ThreadPerConnection", false, null), new Property("Glacier2\\.SessionManager", false, null), new Property("Glacier2\\.SSLSessionManager\\.EndpointSelection", false, null), new Property("Glacier2\\.SSLSessionManager\\.ConnectionCached", false, null), @@ -617,7 +534,6 @@ public final class PropertyNames new Property("Glacier2\\.SSLSessionManager\\.Router", false, null), new Property("Glacier2\\.SSLSessionManager\\.CollocationOptimization", true, "Glacier2.SSLSessionManager.CollocationOptimized"), new Property("Glacier2\\.SSLSessionManager\\.CollocationOptimized", false, null), - new Property("Glacier2\\.SSLSessionManager\\.ThreadPerConnection", false, null), new Property("Glacier2\\.SSLSessionManager", false, null), new Property("Glacier2\\.SessionTimeout", false, null), new Property("Glacier2\\.Trace\\.RoutingTable", false, null), @@ -667,7 +583,6 @@ public final class PropertyNames IcePatch2Props, IceSSLProps, IceStormAdminProps, - IceStormProps, Glacier2Props, FreezeProps, null @@ -683,7 +598,6 @@ public final class PropertyNames "IcePatch2", "IceSSL", "IceStormAdmin", - "IceStorm", "Glacier2", "Freeze", null diff --git a/java/src/IceInternal/Reference.java b/java/src/IceInternal/Reference.java index a05becb3c56..5fdb2696e70 100644 --- a/java/src/IceInternal/Reference.java +++ b/java/src/IceInternal/Reference.java @@ -83,7 +83,6 @@ public abstract class Reference implements Cloneable public abstract boolean getCacheConnection(); public abstract boolean getPreferSecure(); public abstract Ice.EndpointSelectionType getEndpointSelection(); - public abstract boolean getThreadPerConnection(); public abstract int getLocatorCacheTimeout(); // @@ -179,7 +178,6 @@ public abstract class Reference implements Cloneable public abstract Reference changeCacheConnection(boolean newCache); public abstract Reference changePreferSecure(boolean newPreferSecure); public abstract Reference changeEndpointSelection(Ice.EndpointSelectionType newType); - public abstract Reference changeThreadPerConnection(boolean newTpc); public abstract Reference changeLocatorCacheTimeout(int newTimeout); public abstract Reference changeTimeout(int newTimeout); diff --git a/java/src/IceInternal/ReferenceFactory.java b/java/src/IceInternal/ReferenceFactory.java index 2e5ed274c90..23e5094dcd0 100644 --- a/java/src/IceInternal/ReferenceFactory.java +++ b/java/src/IceInternal/ReferenceFactory.java @@ -647,8 +647,7 @@ public final class ReferenceFactory "LocatorCacheTimeout", "Locator", "Router", - "CollocationOptimized", - "ThreadPerConnection" + "CollocationOptimized" }; private void @@ -717,7 +716,6 @@ public final class ReferenceFactory boolean cacheConnection = true; boolean preferSecure = defaultsAndOverrides.defaultPreferSecure; Ice.EndpointSelectionType endpointSelection = defaultsAndOverrides.defaultEndpointSelection; - boolean threadPerConnection = _instance.threadPerConnection(); int locatorCacheTimeout = defaultsAndOverrides.defaultLocatorCacheTimeout; // @@ -787,9 +785,6 @@ public final class ReferenceFactory } } - property = propertyPrefix + ".ThreadPerConnection"; - threadPerConnection = properties.getPropertyAsIntWithDefault(property, threadPerConnection ? 1 : 0) > 0; - property = propertyPrefix + ".LocatorCacheTimeout"; locatorCacheTimeout = properties.getPropertyAsIntWithDefault(property, locatorCacheTimeout); } @@ -812,7 +807,6 @@ public final class ReferenceFactory cacheConnection, preferSecure, endpointSelection, - threadPerConnection, locatorCacheTimeout)); } diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java index 2daa83c2325..f45cfe6e9dd 100644 --- a/java/src/IceInternal/RequestHandler.java +++ b/java/src/IceInternal/RequestHandler.java @@ -18,11 +18,11 @@ public interface RequestHandler Ice.ConnectionI sendRequest(Outgoing out) throws LocalExceptionWrapper; - void sendAsyncRequest(OutgoingAsync out) + boolean sendAsyncRequest(OutgoingAsync out) throws LocalExceptionWrapper; boolean flushBatchRequests(BatchOutgoing out); - void flushAsyncBatchRequests(BatchOutgoingAsync out); + boolean flushAsyncBatchRequests(BatchOutgoingAsync out); Reference getReference(); diff --git a/java/src/IceInternal/RoutableReference.java b/java/src/IceInternal/RoutableReference.java index 0b89530575f..a16fffe72c6 100644 --- a/java/src/IceInternal/RoutableReference.java +++ b/java/src/IceInternal/RoutableReference.java @@ -59,12 +59,6 @@ public class RoutableReference extends Reference return _endpointSelection; } - public final boolean - getThreadPerConnection() - { - return _threadPerConnection; - } - public final int getLocatorCacheTimeout() { @@ -189,18 +183,6 @@ public class RoutableReference extends Reference } public Reference - changeThreadPerConnection(boolean newTpc) - { - if(newTpc == _threadPerConnection) - { - return this; - } - RoutableReference r = (RoutableReference)getInstance().referenceFactory().copy(this); - r._threadPerConnection = newTpc; - return r; - } - - public Reference changeLocatorCacheTimeout(int newTimeout) { if(_locatorCacheTimeout == newTimeout) @@ -390,10 +372,6 @@ public class RoutableReference extends Reference { return false; } - if(_threadPerConnection != rhs._threadPerConnection) - { - return false; - } if(_locatorCacheTimeout != rhs._locatorCacheTimeout) { return false; @@ -618,7 +596,6 @@ public class RoutableReference extends Reference boolean cacheConnection, boolean prefereSecure, Ice.EndpointSelectionType endpointSelection, - boolean threadPerConnection, int locatorCacheTimeout) { super(instance, communicator, identity, context, facet, mode, secure); @@ -630,7 +607,6 @@ public class RoutableReference extends Reference _cacheConnection = cacheConnection; _preferSecure = prefereSecure; _endpointSelection = endpointSelection; - _threadPerConnection = threadPerConnection; _locatorCacheTimeout = locatorCacheTimeout; _overrideTimeout = false; _timeout = -1; @@ -798,7 +774,7 @@ public class RoutableReference extends Reference // Get an existing connection or create one if there's no // existing connection to one of the given endpoints. // - connection = factory.create(endpoints, false, _threadPerConnection, getEndpointSelection(), compress); + connection = factory.create(endpoints, false, getEndpointSelection(), compress); } else { @@ -818,8 +794,7 @@ public class RoutableReference extends Reference { endpoint[0] = endpoints[i]; final boolean more = i != endpoints.length - 1; - connection = factory.create(endpoint, more, _threadPerConnection, getEndpointSelection(), - compress); + connection = factory.create(endpoint, more, getEndpointSelection(), compress); break; } catch(Ice.LocalException ex) @@ -870,7 +845,7 @@ public class RoutableReference extends Reference // Get an existing connection or create one if there's no // existing connection to one of the given endpoints. // - factory.create(endpoints, false, _threadPerConnection, getEndpointSelection(), + factory.create(endpoints, false, getEndpointSelection(), new OutgoingConnectionFactory.CreateConnectionCallback() { public void @@ -905,7 +880,7 @@ public class RoutableReference extends Reference // connection for one of the endpoints. // - factory.create(new EndpointI[]{ endpoints[0] }, true, _threadPerConnection, getEndpointSelection(), + factory.create(new EndpointI[]{ endpoints[0] }, true, getEndpointSelection(), new OutgoingConnectionFactory.CreateConnectionCallback() { public void @@ -939,7 +914,7 @@ public class RoutableReference extends Reference final boolean more = _i != endpoints.length - 1; final EndpointI[] endpoint = new EndpointI[]{ endpoints[_i] }; - factory.create(endpoint, more, _threadPerConnection, getEndpointSelection(), this); + factory.create(endpoint, more, getEndpointSelection(), this); } private int _i = 0; @@ -1003,7 +978,6 @@ public class RoutableReference extends Reference private boolean _cacheConnection; private boolean _preferSecure; private Ice.EndpointSelectionType _endpointSelection; - private boolean _threadPerConnection; private int _locatorCacheTimeout; private boolean _overrideTimeout; diff --git a/java/src/IceInternal/Selector.java b/java/src/IceInternal/Selector.java new file mode 100644 index 00000000000..32b7f36d641 --- /dev/null +++ b/java/src/IceInternal/Selector.java @@ -0,0 +1,479 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public final class Selector +{ + + public + Selector(Instance instance, int timeout) + { + _instance = instance; + _timeout = timeout; + _interruptCount = 0; + + Network.SocketPair pair = Network.createPipe(); + _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; + _fdIntrWrite = pair.sink; + try + { + _selector = java.nio.channels.Selector.open(); + pair.source.configureBlocking(false); + _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ); + } + catch(java.io.IOException ex) + { + Ice.SyscallException sys = new Ice.SyscallException(); + sys.initCause(ex); + throw sys; + } + + // + // The Selector holds a Set representing the selected keys. The + // Set reference doesn't change, so we obtain it once here. + // + _keys = _selector.selectedKeys(); + } + + public void + destroy() + { + try + { + _selector.close(); + } + catch(java.io.IOException ex) + { + } + _selector = null; + + try + { + _fdIntrWrite.close(); + } + catch(java.io.IOException ex) + { + } + _fdIntrWrite = null; + + try + { + _fdIntrRead.close(); + } + catch(java.io.IOException ex) + { + } + _fdIntrRead = null; + } + + public void + add(SelectorHandler handler, SocketStatus status) + { + // Note: we can't support noInterrupt for add() because a channel can't be registered again + // with the selector until the previous selection key has been removed from the cancel-key + // set of the selector on the next select() operation. + + handler._pendingStatus = status; + if(_changes.add(handler)) + { + setInterrupt(); + } + } + + public void + update(SelectorHandler handler, SocketStatus newStatus) + { + // Note: can only be called from the select() thread + assert(handler._key != null); + handler._key.interestOps(convertStatus(handler.fd(), newStatus)); + } + + public void + remove(SelectorHandler handler) + { + // Note: we can't support noInterrupt for remove() because a channel can't be registered again + // with the selector until the previous selection key has been removed from the cancel-key + // set of the selector on the next select() operation. + + handler._pendingStatus = IceInternal.SocketStatus.Finished; + if(_changes.add(handler)) + { + setInterrupt(); + } + } + + public void + select() + throws java.io.IOException + { + // + // If there are still interrupts, selected keys or pending handlers to process, + // return immediately. + // + if(_interrupted || !_keys.isEmpty() || !_pendingHandlers.isEmpty()) + { + return; + } + + // + // There's nothing left to process, we can now select. + // + while(true) + { + try + { + if(_nextPendingHandlers.isEmpty()) + { + if(_timeout > 0) + { + _selector.select(_timeout * 1000); + } + else + { + _selector.select(); + } + } + else + { + _selector.selectNow(); + + java.util.HashSet<SelectorHandler> tmp = _nextPendingHandlers; + _nextPendingHandlers = _pendingHandlers; + _pendingHandlers = tmp; + } + } + catch(java.nio.channels.CancelledKeyException ex) + { + // This sometime occurs on Mac OS X, ignore. + continue; + } + catch(java.io.IOException ex) + { + // + // Pressing Ctrl-C causes select() to raise an + // IOException, which seems like a JDK bug. We trap + // for that special case here and ignore it. + // Hopefully we're not masking something important! + // + if(Network.interrupted(ex)) + { + continue; + } + + throw ex; + } + + break; + } + } + + public SelectorHandler + getNextSelected() + { + assert(_interruptCount == 0); + + if(_iter == null && !_keys.isEmpty()) + { + _iter = _keys.iterator(); + } + + while(_iter != null && _iter.hasNext()) + { + java.nio.channels.SelectionKey key = _iter.next(); + _iter.remove(); + SelectorHandler handler = (SelectorHandler)key.attachment(); + if(handler == null) + { + assert(_pendingInterruptRead > 0); + _pendingInterruptRead -= readInterrupt(_pendingInterruptRead); + continue; + } + else if(handler._key == null || !handler._key.isValid()) + { + continue; + } + if(handler.hasMoreData()) + { + _pendingHandlers.remove(handler); + } + return handler; + } + + if(_pendingIter == null && !_pendingHandlers.isEmpty()) + { + _pendingIter = _pendingHandlers.iterator(); + } + + while(_pendingIter != null && _pendingIter.hasNext()) + { + SelectorHandler handler = _pendingIter.next(); + _pendingIter.remove(); + if(handler._key == null || !handler._key.isValid() || !handler.hasMoreData()) + { + continue; + } + return handler; + } + + _iter = null; + _pendingIter = null; + return null; + } + + public void + hasMoreData(SelectorHandler handler) + { + _nextPendingHandlers.add(handler); + } + + public boolean + processInterrupt() + { + assert(_changes.size() <= _interruptCount); + + if(!_changes.isEmpty()) + { + java.util.Iterator<SelectorHandler> p = _changes.iterator(); + while(p.hasNext()) + { + SelectorHandler handler = p.next(); + if(handler._pendingStatus == SocketStatus.Finished) + { + removeImpl(handler); + } + else + { + addImpl(handler, handler._pendingStatus); + } + clearInterrupt(); + } + _changes.clear(); + + // + // We call selectNow() to flush the cancelled-key set and ensure handlers can be + // added again once this returns. + // + try + { + _selector.selectNow(); + } + catch(java.io.IOException ex) + { + // Ignore. + } + _iter = null; // Current iterator is invalidated by selectNow() + } + + _interrupted = _interruptCount > 0; + return _interruptCount == 0; // No more interrupts to process. + } + + public boolean + checkTimeout() + { + if(_interruptCount == 0 && _keys.isEmpty() && _pendingHandlers.isEmpty()) + { + if(_timeout <= 0) + { + // + // This is necessary to prevent a busy loop in case of a spurious wake-up which + // sometime occurs in the client thread pool when the communicator is destroyed. + // If there are too many successive spurious wake-ups, we log an error. + // + try + { + Thread.currentThread().sleep(1); + } + catch(java.lang.InterruptedException ex) + { + } + + if(++_spuriousWakeUp > 100) + { + _instance.initializationData().logger.error("spurious selector wake up"); + } + return false; + } + return true; + } + else + { + _spuriousWakeUp = 0; + return false; + } + } + + public boolean + isInterrupted() + { + return _interruptCount > 0; + } + + public void + setInterrupt() + { + if(++_interruptCount == 1) + { + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); + buf.put(0, (byte)0); + while(buf.hasRemaining()) + { + try + { + _fdIntrWrite.write(buf); + } + catch(java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + } + } + } + + public boolean + clearInterrupt() + { + if(--_interruptCount == 0) + { + // + // If the interrupt byte has not been received by the pipe yet, we just increment + // _pendingInterruptRead. It will be read when the _fdIntrRead is ready for read. + // + if(_keys.contains(_fdIntrReadKey)) + { + readInterrupt(1); + _keys.remove(_fdIntrReadKey); + _iter = null; + } + else + { + ++_pendingInterruptRead; + } + _interrupted = false; + return false; + } + else + { + return true; + } + } + + private int + readInterrupt(int count) + { + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(count); + try + { + buf.rewind(); + int ret = _fdIntrRead.read(buf); + assert(ret > 0); + return ret; + } + catch(java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + } + + private int + convertStatus(java.nio.channels.SelectableChannel fd, SocketStatus status) + { + if(status == SocketStatus.NeedConnect) + { + return java.nio.channels.SelectionKey.OP_CONNECT; + } + else if(status == SocketStatus.NeedRead) + { + if((fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) + { + return java.nio.channels.SelectionKey.OP_READ; + } + else + { + return java.nio.channels.SelectionKey.OP_ACCEPT; + } + } + else + { + assert(status == SocketStatus.NeedWrite); + return java.nio.channels.SelectionKey.OP_WRITE; + } + } + + private void + addImpl(SelectorHandler handler, SocketStatus status) + { + if(handler._key != null) + { + handler._key.interestOps(convertStatus(handler.fd(), status)); + } + else + { + try + { + handler._key = handler.fd().register(_selector, convertStatus(handler.fd(), status), handler); + } + catch(java.nio.channels.ClosedChannelException ex) + { + assert(false); + } + assert(!_nextPendingHandlers.contains(handler)); + } + + if(handler.hasMoreData()) + { + _nextPendingHandlers.add(handler); + } + } + + private void + removeImpl(SelectorHandler handler) + { + _nextPendingHandlers.remove(handler); + + if(handler._key != null) + { + try + { + handler._key.cancel(); + handler._key = null; + } + catch(java.nio.channels.CancelledKeyException ex) + { + assert(false); + } + } + } + + final private Instance _instance; + final private int _timeout; + + private java.nio.channels.Selector _selector; + private java.nio.channels.ReadableByteChannel _fdIntrRead; + private java.nio.channels.WritableByteChannel _fdIntrWrite; + private java.nio.channels.SelectionKey _fdIntrReadKey; + + private java.util.Set<java.nio.channels.SelectionKey> _keys; + private java.util.Iterator<java.nio.channels.SelectionKey> _iter; + private java.util.HashSet<SelectorHandler> _changes = new java.util.HashSet<SelectorHandler>(); + + private boolean _interrupted; + private int _spuriousWakeUp; + private int _interruptCount; + private int _pendingInterruptRead; + + private java.util.HashSet<SelectorHandler> _pendingHandlers = new java.util.HashSet<SelectorHandler>(); + private java.util.HashSet<SelectorHandler> _nextPendingHandlers = new java.util.HashSet<SelectorHandler>(); + private java.util.Iterator<SelectorHandler> _pendingIter; +}; diff --git a/java/src/IceInternal/SelectorHandler.java b/java/src/IceInternal/SelectorHandler.java new file mode 100644 index 00000000000..8c139e6030b --- /dev/null +++ b/java/src/IceInternal/SelectorHandler.java @@ -0,0 +1,31 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +abstract class SelectorHandler +{ + abstract public java.nio.channels.SelectableChannel fd(); + + // + // In Java, it's possible that the transceiver reads more data + // than what was really asked. If this is the case, hasMoreData() + // returns true and the handler read() method should be called + // again (without doing a select()). This is handled by the + // Selector class (it adds the handler to a separate list of handlers + // if this method returns true.) + // + abstract public boolean hasMoreData(); + + // + // The _key data member are only for use by the Selector. + // + protected java.nio.channels.SelectionKey _key; + protected SocketStatus _pendingStatus; +}; diff --git a/java/src/IceInternal/SelectorThread.java b/java/src/IceInternal/SelectorThread.java index f3a824bfdc0..2753edcef84 100644 --- a/java/src/IceInternal/SelectorThread.java +++ b/java/src/IceInternal/SelectorThread.java @@ -11,48 +11,27 @@ package IceInternal; public class SelectorThread { - public interface SocketReadyCallback + static public abstract class SocketReadyCallback extends SelectorHandler implements TimerTask { - // - // The selector thread unregisters the callback when socketReady returns SocketStatus.Finished. - // - SocketStatus socketReady(boolean finished); + abstract public SocketStatus socketReady(); + abstract public void socketFinished(); // // The selector thread doesn't unregister the callback when sockectTimeout is called; socketTimeout // must unregister the callback either explicitly with unregister() or by shutting down the socket // (if necessary). // - void socketTimeout(); + //abstract void socketTimeout(); + + protected int _timeout; + protected SocketStatus _status; } SelectorThread(Instance instance) { _instance = instance; _destroyed = false; - - Network.SocketPair pair = Network.createPipe(); - _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; - _fdIntrWrite = pair.sink; - - try - { - _selector = java.nio.channels.Selector.open(); - pair.source.configureBlocking(false); - _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ); - } - catch(java.io.IOException ex) - { - Ice.SyscallException sys = new Ice.SyscallException(); - sys.initCause(ex); - throw sys; - } - - // - // The Selector holds a Set representing the selected keys. The - // Set reference doesn't change, so we obtain it once here. - // - _keys = _selector.selectedKeys(); + _selector = new Selector(instance, 0); _thread = new HelperThread(); _thread.start(); @@ -72,34 +51,44 @@ public class SelectorThread { assert(!_destroyed); _destroyed = true; - setInterrupt(); + _selector.setInterrupt(); } public synchronized void - _register(java.nio.channels.SelectableChannel fd, SocketReadyCallback cb, SocketStatus status, int timeout) + _register(SocketReadyCallback cb, SocketStatus status, int timeout) { assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories. assert(status != SocketStatus.Finished); - SocketInfo info = new SocketInfo(fd, cb, status, timeout); - _changes.add(info); - if(info.timeout >= 0) + cb._timeout = timeout; + cb._status = status; + if(cb._timeout >= 0) { - _timer.schedule(info, info.timeout); + _timer.schedule(cb, cb._timeout); } - setInterrupt(); + + _selector.add(cb, status); } - // - // Unregister the given file descriptor. The registered callback will be notified with socketReady() - // upon registration to allow some cleanup to be done. - // public synchronized void - unregister(java.nio.channels.SelectableChannel fd) + unregister(SocketReadyCallback cb) { + // Note: unregister should only be called from the socketReady() call-back. assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories. - _changes.add(new SocketInfo(fd, null, SocketStatus.Finished, 0)); - setInterrupt(); + + _selector.remove(cb); + cb._status = SocketStatus.Finished; + } + + public synchronized void + finish(SocketReadyCallback cb) + { + assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories. + + _selector.remove(cb); + + _finished.add(cb); + _selector.setInterrupt(); } public void @@ -117,110 +106,48 @@ public class SelectorThread } } - private void - clearInterrupt() - { - byte b = 0; - - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - try - { - while(true) - { - buf.rewind(); - if(_fdIntrRead.read(buf) != 1) - { - break; - } - - b = buf.get(0); - break; - } - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - - private void - setInterrupt() + public void + run() { - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - buf.put(0, (byte)0); - while(buf.hasRemaining()) + while(true) { try { - _fdIntrWrite.write(buf); + _selector.select(); } catch(java.io.IOException ex) { Ice.SocketException se = new Ice.SocketException(); se.initCause(ex); - throw se; + //throw se; + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + se.printStackTrace(pw); + pw.flush(); + String s = "exception in selector thread:\n" + sw.toString(); + _instance.initializationData().logger.error(s); + continue; } - } - } - - public void - run() - { - java.util.Map<java.nio.channels.SelectableChannel, SocketInfo> socketMap = - new java.util.HashMap<java.nio.channels.SelectableChannel, SocketInfo>(); - java.util.LinkedList<SocketInfo> readyList = new java.util.LinkedList<SocketInfo>(); - java.util.LinkedList<SocketInfo> finishedList = new java.util.LinkedList<SocketInfo>(); - while(true) - { - int ret = 0; + + java.util.LinkedList<SocketReadyCallback> readyList = new java.util.LinkedList<SocketReadyCallback>(); + boolean finished = false; - while(true) + synchronized(this) { - try - { - ret = _selector.select(); - } - catch(java.io.IOException ex) + _selector.checkTimeout(); + + if(_selector.isInterrupted()) { - // - // Pressing Ctrl-C causes select() to raise an - // IOException, which seems like a JDK bug. We trap - // for that special case here and ignore it. - // Hopefully we're not masking something important! - // - if(Network.interrupted(ex)) + if(_selector.processInterrupt()) { continue; } - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - //throw se; - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - se.printStackTrace(pw); - pw.flush(); - String s = "exception in selector thread:\n" + sw.toString(); - _instance.initializationData().logger.error(s); - continue; - } - - break; - } - - assert(readyList.isEmpty() && finishedList.isEmpty()); - - if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable()) - { - synchronized(this) - { // // There are two possiblities for an interrupt: // // 1. The selector thread has been destroyed. - // 2. A socket was registered or unregistered. + // 2. A callback is being finished // // @@ -231,82 +158,45 @@ public class SelectorThread break; } - // - // Remove the interrupt channel from the selected key set. - // - _keys.remove(_fdIntrReadKey); - - clearInterrupt(); - SocketInfo info = _changes.removeFirst(); - if(info.cb != null) // Registration + do { - try - { - info.key = info.fd.register(_selector, convertStatus(info.status), info); - } - catch(java.nio.channels.ClosedChannelException ex) - { - assert(false); - } - assert(!socketMap.containsKey(info.fd)); - socketMap.put(info.fd, info); - } - else // Unregistration - { - info = socketMap.get(info.fd); - if(info != null && info.status != SocketStatus.Finished) - { - if(info.timeout >= 0) - { - _timer.cancel(info); - } - - try - { - info.key.cancel(); - } - catch(java.nio.channels.CancelledKeyException ex) - { - assert(false); - } - info.status = SocketStatus.Finished; - readyList.add(info); - } + SocketReadyCallback cb = _finished.removeFirst(); + cb._status = SocketStatus.Finished; + readyList.add(cb); } + while(_selector.clearInterrupt()); // As long as there are interrupts + finished = true; } - } - else - { - // - // Examine the selection key set. - // - java.util.Iterator<java.nio.channels.SelectionKey> iter = _keys.iterator(); - while(iter.hasNext()) + else { - // - // Ignore selection keys that have been cancelled or timed out. - // - java.nio.channels.SelectionKey key = iter.next(); - iter.remove(); - assert(key != _fdIntrReadKey); - SocketInfo info = (SocketInfo)key.attachment(); - if(info.timeout >= 0) + SocketReadyCallback cb; + while((cb = (SocketReadyCallback)_selector.getNextSelected()) != null) { - _timer.cancel(info); + readyList.add(cb); } - assert(key.isValid()); - readyList.add(info); } } - java.util.Iterator<SocketInfo> iter = readyList.iterator(); + java.util.Iterator<SocketReadyCallback> iter = readyList.iterator(); while(iter.hasNext()) { - SocketInfo info = iter.next(); - SocketStatus status; + SocketStatus status = SocketStatus.Finished; + SocketReadyCallback cb = iter.next(); try { - status = info.cb.socketReady(info.status == SocketStatus.Finished); + if(cb._timeout >= 0) + { + _timer.cancel(cb); + } + + if(finished) + { + cb.socketFinished(); + } + else + { + status = cb.socketReady(); + } } catch(Ice.LocalException ex) { @@ -320,142 +210,39 @@ public class SelectorThread status = SocketStatus.Finished; } - if(status == SocketStatus.Finished) - { - finishedList.add(info); - } - else + if(status != SocketStatus.Finished) { - assert(info.status != SocketStatus.Finished); - try - { - info.status = status; - info.key.interestOps(convertStatus(status)); - if(info.timeout >= 0) - { - _timer.schedule(info, info.timeout); - } - } - catch(java.nio.channels.CancelledKeyException ex) + if(cb.hasMoreData()) { - assert(false); + _selector.hasMoreData(cb); } - } - } - readyList.clear(); - - if(finishedList.isEmpty()) - { - continue; - } - iter = finishedList.iterator(); - while(iter.hasNext()) - { - SocketInfo info = iter.next(); - if(info.status != SocketStatus.Finished) - { - try + if(status != cb._status) { - info.key.cancel(); + synchronized(this) + { + _selector.update(cb, status); + cb._status = status; + } } - catch(java.nio.channels.CancelledKeyException ex) + + if(cb._timeout >= 0) { - //assert(false); // The channel might already be closed at this point so we can't assert. + _timer.schedule(cb, cb._timeout); } } - socketMap.remove(info.fd); } - finishedList.clear(); } assert(_destroyed); - try - { - _selector.close(); - } - catch(java.io.IOException ex) - { - // Ignore. - } - - try - { - _fdIntrWrite.close(); - } - catch(java.io.IOException ex) - { - // - // BUGFIX: - // - // Ignore this exception. This shouldn't happen - // but for some reasons the close() call raises - // "java.io.IOException: No such file or - // directory" under Linux with JDK 1.4.2. - // - } - _fdIntrWrite = null; - - try - { - _fdIntrRead.close(); - } - catch(java.io.IOException ex) - { - } - _fdIntrRead = null; - } - - private int - convertStatus(SocketStatus status) - { - if(status == SocketStatus.NeedConnect) - { - return java.nio.channels.SelectionKey.OP_CONNECT; - } - else if(status == SocketStatus.NeedRead) - { - return java.nio.channels.SelectionKey.OP_READ; - } - else - { - assert(status == SocketStatus.NeedWrite); - return java.nio.channels.SelectionKey.OP_WRITE; - } + _selector.destroy(); } private Instance _instance; private boolean _destroyed; - private java.nio.channels.ReadableByteChannel _fdIntrRead; - private java.nio.channels.SelectionKey _fdIntrReadKey; - private java.nio.channels.WritableByteChannel _fdIntrWrite; - private java.nio.channels.Selector _selector; - private java.util.Set<java.nio.channels.SelectionKey> _keys; - private java.util.LinkedList<SocketInfo> _changes = new java.util.LinkedList<SocketInfo>(); - - private final class SocketInfo implements TimerTask - { - java.nio.channels.SelectableChannel fd; - SocketReadyCallback cb; - SocketStatus status; - int timeout; - java.nio.channels.SelectionKey key; - - public void - runTimerTask() - { - this.cb.socketTimeout(); // Exceptions will be reported by the timer thread. - } - - SocketInfo(java.nio.channels.SelectableChannel fd, SocketReadyCallback cb, SocketStatus status, int timeout) - { - this.fd = fd; - this.cb = cb; - this.status = status; - this.timeout = timeout; - } - } + private Selector _selector; + private java.util.LinkedList<SocketReadyCallback> _finished = new java.util.LinkedList<SocketReadyCallback>(); private final class HelperThread extends Thread { diff --git a/java/src/IceInternal/TcpAcceptor.java b/java/src/IceInternal/TcpAcceptor.java index 72c8b923d06..87e1f619adf 100644 --- a/java/src/IceInternal/TcpAcceptor.java +++ b/java/src/IceInternal/TcpAcceptor.java @@ -26,37 +26,9 @@ class TcpAcceptor implements Acceptor _logger.trace(_traceLevels.networkCat, s); } - java.nio.channels.ServerSocketChannel fd; - java.nio.channels.Selector selector; - synchronized(this) - { - fd = _fd; - selector = _selector; - _fd = null; - _selector = null; - } - if(fd != null) - { - try - { - fd.close(); - } - catch(java.io.IOException ex) - { - // Ignore. - } - } - if(selector != null) - { - try - { - selector.close(); - } - catch(java.io.IOException ex) - { - // Ignore. - } - } + assert(_fd != null); + Network.closeSocketNoThrow(_fd); + _fd = null; } public void @@ -72,60 +44,15 @@ class TcpAcceptor implements Acceptor } public Transceiver - accept(int timeout) + accept() { java.nio.channels.SocketChannel fd = null; - while(fd == null) + while(true) { try { fd = _fd.accept(); - if(fd == null) - { - if(_selector == null) - { - _selector = java.nio.channels.Selector.open(); - } - - while(true) - { - try - { - java.nio.channels.SelectionKey key = - _fd.register(_selector, java.nio.channels.SelectionKey.OP_ACCEPT); - if(timeout > 0) - { - if(_selector.select(timeout) == 0) - { - throw new Ice.TimeoutException(); - } - } - else if(timeout == 0) - { - if(_selector.selectNow() == 0) - { - throw new Ice.TimeoutException(); - } - } - else - { - _selector.select(); - } - - break; - } - catch(java.io.IOException ex) - { - if(Network.interrupted(ex)) - { - continue; - } - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - } + break; } catch(java.io.IOException ex) { @@ -163,15 +90,6 @@ class TcpAcceptor implements Acceptor return new TcpTransceiver(_instance, fd, true); } - public void - connectToSelf() - { - java.nio.channels.SocketChannel fd = Network.createTcpSocket(); - Network.setBlock(fd, false); - Network.doConnect(fd, _addr, -1); - Network.closeSocket(fd); - } - public String toString() { diff --git a/java/src/IceInternal/TcpConnector.java b/java/src/IceInternal/TcpConnector.java index db4061b1721..99a80dff1db 100644 --- a/java/src/IceInternal/TcpConnector.java +++ b/java/src/IceInternal/TcpConnector.java @@ -12,7 +12,7 @@ package IceInternal; final class TcpConnector implements Connector, java.lang.Comparable { public Transceiver - connect(int timeout) + connect() { if(_traceLevels.network >= 2) { @@ -25,7 +25,7 @@ final class TcpConnector implements Connector, java.lang.Comparable java.nio.channels.SocketChannel fd = Network.createTcpSocket(); Network.setBlock(fd, false); Network.setTcpBufSize(fd, _instance.initializationData().properties, _logger); - boolean connected = Network.doConnect(fd, _addr, timeout); + boolean connected = Network.doConnect(fd, _addr); if(connected) { if(_traceLevels.network >= 1) diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index 80730ce6db0..548f2c3a94b 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -19,9 +19,9 @@ final class TcpTransceiver implements Transceiver } public SocketStatus - initialize(int timeout) + initialize() { - if(_state == StateNeedConnect && timeout == 0) + if(_state == StateNeedConnect) { _state = StateConnectPending; return SocketStatus.NeedConnect; @@ -30,7 +30,7 @@ final class TcpTransceiver implements Transceiver { try { - Network.doFinishConnect(_fd, timeout); + Network.doFinishConnect(_fd); _state = StateConnected; _desc = Network.fdToString(_fd); } @@ -63,82 +63,10 @@ final class TcpTransceiver implements Transceiver _logger.trace(_traceLevels.networkCat, s); } - synchronized(this) - { - assert(_fd != null); - if(_readSelector != null) - { - try - { - _readSelector.close(); - } - catch(java.io.IOException ex) - { - // Ignore. - } - _readSelector = null; - } - if(_writeSelector != null) - { - try - { - _writeSelector.close(); - } - catch(java.io.IOException ex) - { - // Ignore. - } - _writeSelector = null; - } - try - { - _fd.close(); - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - finally - { - _fd = null; - } - } - } - - public void - shutdownWrite() - { - if(_state < StateConnected) - { - return; - } - - if(_traceLevels.network >= 2) - { - String s = "shutting down tcp connection for writing\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); - } - assert(_fd != null); - java.net.Socket socket = _fd.socket(); try { - socket.shutdownOutput(); // Shutdown socket for writing - } - catch(java.net.SocketException ex) - { - // - // Ignore. We can't reliably figure out if the socket - // exception is because the socket is not connected. - // - // if(!Network.notConnected(ex)) - // { - // Ice.SocketException se = new Ice.SocketException(); - // se.initCause(ex); - // throw se; - // } + _fd.close(); } catch(java.io.IOException ex) { @@ -146,107 +74,85 @@ final class TcpTransceiver implements Transceiver se.initCause(ex); throw se; } - } - - public void - shutdownReadWrite() - { - if(_state < StateConnected) - { - return; - } - - if(_traceLevels.network >= 2) - { - String s = "shutting down tcp connection for reading and writing\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); - } - - assert(_fd != null); - java.net.Socket socket = _fd.socket(); - try + finally { - socket.shutdownInput(); // Shutdown socket for reading - socket.shutdownOutput(); // Shutdown socket for writing - } - catch(java.net.SocketException ex) - { - // - // Ignore. We can't reliably figure out if the socket - // exception is because the socket is not connected. - // - // if(!Network.notConnected(ex)) - // { - // Ice.SocketException se = new Ice.SocketException(); - // se.initCause(ex); - // throw se; - // } - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; + _fd = null; } } - + public boolean - write(Buffer buf, int timeout) + write(Buffer buf) { - while(writeBuffer(buf.b)) + final int size = buf.b.limit(); + int packetSize = size - buf.b.position(); + if(_maxPacketSize > 0 && packetSize > _maxPacketSize) { - // - // There is more data to write but the socket would block; now we - // must deal with timeouts. - // - assert(buf.b.hasRemaining()); + packetSize = _maxPacketSize; + buf.b.limit(buf.b.position() + packetSize); + } - if(timeout == 0) - { - return false; - } - + while(buf.b.hasRemaining()) + { try { - if(_writeSelector == null) + assert(_fd != null); + int ret = _fd.write(buf.b); + + if(ret == -1) { - _writeSelector = java.nio.channels.Selector.open(); - _fd.register(_writeSelector, java.nio.channels.SelectionKey.OP_WRITE, null); + throw new Ice.ConnectionLostException(); } - - try + else if(ret == 0) { - if(timeout > 0) - { - long start = IceInternal.Time.currentMonotonicTimeMillis(); - int n = _writeSelector.select(timeout); - if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout) - { - throw new Ice.TimeoutException(); - } - } - else + // + // Writing would block, so we reset the limit (if necessary) and return true to indicate + // that more data must be sent. + // + if(packetSize == _maxPacketSize) { - _writeSelector.select(); + buf.b.limit(size); } + return false; + } + + if(_traceLevels.network >= 3) + { + String s = "sent " + ret + " of " + size + " bytes via tcp\n" + toString(); + _logger.trace(_traceLevels.networkCat, s); } - catch(java.io.InterruptedIOException ex) + + if(_stats != null) { - // Ignore. + _stats.bytesSent(type(), ret); + } + + if(packetSize == _maxPacketSize) + { + assert(buf.b.position() == buf.b.limit()); + packetSize = size - buf.b.position(); + if(packetSize > _maxPacketSize) + { + packetSize = _maxPacketSize; + } + buf.b.limit(buf.b.position() + packetSize); } } + catch(java.io.InterruptedIOException ex) + { + continue; + } catch(java.io.IOException ex) { Ice.SocketException se = new Ice.SocketException(); se.initCause(ex); throw se; } - } + } return true; } public boolean - read(Buffer buf, int timeout, Ice.BooleanHolder moreData) + read(Buffer buf, Ice.BooleanHolder moreData) { int remaining = 0; if(_traceLevels.network >= 3) @@ -269,39 +175,7 @@ final class TcpTransceiver implements Transceiver if(ret == 0) { - if(timeout == 0) - { - return false; - } - - if(_readSelector == null) - { - _readSelector = java.nio.channels.Selector.open(); - _fd.register(_readSelector, java.nio.channels.SelectionKey.OP_READ, null); - } - - try - { - if(timeout > 0) - { - long start = IceInternal.Time.currentMonotonicTimeMillis(); - int n = _readSelector.select(timeout); - if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout) - { - throw new Ice.TimeoutException(); - } - } - else - { - _readSelector.select(); - } - } - catch(java.io.InterruptedIOException ex) - { - // Ignore. - } - - continue; + return false; } if(ret > 0) @@ -398,86 +272,12 @@ final class TcpTransceiver implements Transceiver super.finalize(); } - private boolean - writeBuffer(java.nio.ByteBuffer buf) - { - final int size = buf.limit(); - int packetSize = size - buf.position(); - if(_maxPacketSize > 0 && packetSize > _maxPacketSize) - { - packetSize = _maxPacketSize; - buf.limit(buf.position() + packetSize); - } - - while(buf.hasRemaining()) - { - try - { - assert(_fd != null); - int ret = _fd.write(buf); - - if(ret == -1) - { - throw new Ice.ConnectionLostException(); - } - else if(ret == 0) - { - // - // Writing would block, so we reset the limit (if necessary) and return true to indicate - // that more data must be sent. - // - if(packetSize == _maxPacketSize) - { - buf.limit(size); - } - return true; - } - - if(_traceLevels.network >= 3) - { - String s = "sent " + ret + " of " + size + " bytes via tcp\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); - } - - if(_stats != null) - { - _stats.bytesSent(type(), ret); - } - - if(packetSize == _maxPacketSize) - { - assert(buf.position() == buf.limit()); - packetSize = size - buf.position(); - if(packetSize > _maxPacketSize) - { - packetSize = _maxPacketSize; - } - buf.limit(buf.position() + packetSize); - } - } - catch(java.io.InterruptedIOException ex) - { - continue; - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - - return false; // No more data to send. - } - private java.nio.channels.SocketChannel _fd; private TraceLevels _traceLevels; private Ice.Logger _logger; private Ice.Stats _stats; private String _desc; private int _state; - private java.nio.channels.Selector _readSelector; - private java.nio.channels.Selector _writeSelector; private int _maxPacketSize; private static final int StateNeedConnect = 0; diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index f38ea735b6f..6f19615ecf0 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -26,11 +26,13 @@ public final class ThreadPool _destroyed = false; _prefix = prefix; _timeout = timeout; + _selector = new Selector(instance, timeout); _threadIndex = 0; _running = 0; _inUse = 0; _load = 1.0; _promote = true; + _serialize = _instance.initializationData().properties.getPropertyAsInt(_prefix + ".Serialize") > 0; _warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName"); @@ -43,29 +45,6 @@ public final class ThreadPool _programNamePrefix = ""; } - Network.SocketPair pair = Network.createPipe(); - _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; - _fdIntrWrite = pair.sink; - - try - { - _selector = java.nio.channels.Selector.open(); - pair.source.configureBlocking(false); - _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ); - } - catch(java.io.IOException ex) - { - Ice.SyscallException sys = new Ice.SyscallException(); - sys.initCause(ex); - throw sys; - } - - // - // The Selector holds a Set representing the selected keys. The - // Set reference doesn't change, so we obtain it once here. - // - _keys = _selector.selectedKeys(); - // // We use just one thread as the default. This is the fastest // possible setting, still allows one level of nesting, and @@ -144,53 +123,71 @@ public final class ThreadPool } assert(!_destroyed); - assert(_handlerMap.isEmpty()); _destroyed = true; - setInterrupt(); + _selector.setInterrupt(); } public synchronized void - _register(java.nio.channels.SelectableChannel fd, EventHandler handler) + _register(EventHandler handler) { - if(TRACE_REGISTRATION) + assert(!_destroyed); + + if(!handler._registered) { - trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd); + if(TRACE_REGISTRATION) + { + trace("adding handler of type " + handler.getClass().getName() + " for channel " + handler.fd()); + } + + if(!handler._serializing) + { + _selector.add(handler, SocketStatus.NeedRead); + } + handler._registered = true; } - assert(!_destroyed); - _changes.add(new FdHandlerPair(fd, handler)); - setInterrupt(); } public synchronized void - unregister(java.nio.channels.SelectableChannel fd) + unregister(EventHandler handler) { - if(TRACE_REGISTRATION) + assert(!_destroyed); + if(handler._registered) { - if(TRACE_STACK_TRACE) + if(TRACE_REGISTRATION) { - java.io.StringWriter sw = new java.io.StringWriter(); - try - { - throw new RuntimeException(); - } - catch(RuntimeException ex) - { - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - } - trace("removing handler for channel " + fd + "\n" + sw.toString()); + trace("removing handler for channel " + handler.fd()); } - else + + if(!handler._serializing) { - trace("removing handler for channel " + fd); + _selector.remove(handler); } + handler._registered = false; } + } + public synchronized void + finish(EventHandler handler) + { assert(!_destroyed); - _changes.add(new FdHandlerPair(fd, null)); - setInterrupt(); - } + + if(TRACE_REGISTRATION) + { + trace("finishing handler for channel " + handler.fd()); + } + + if(handler._registered) + { + if(!handler._serializing) + { + _selector.remove(handler); + } + handler._registered = false; + } + + _finished.add(handler); + _selector.setInterrupt(); + } public synchronized void execute(ThreadPoolWorkItem workItem) @@ -200,16 +197,25 @@ public final class ThreadPool throw new Ice.CommunicatorDestroyedException(); } _workItems.add(workItem); - setInterrupt(); + _selector.setInterrupt(); } public void - promoteFollower() + promoteFollower(EventHandler handler) { if(_sizeMax > 1) { synchronized(this) { + if(_serialize && handler != null) + { + handler._serializing = true; + if(handler._registered) + { + _selector.remove(handler); + } + } + assert(!_promote); _promote = true; notify(); @@ -280,65 +286,9 @@ public final class ThreadPool } // - // Cleanup the selector, and the socket pair. + // Destroy the selector // - try - { - if(_selector != null) - { - try - { - _selector.close(); - } - catch(java.io.IOException ex) - { - // - // BUGFIX: - // - // Ignore this exception. This shouldn't happen - // but for some reasons the close() call raises - // "java.io.IOException: Bad file descriptor" on - // Mac OS X 10.3.x (it works fine on OS X 10.4.x) - // - } - _selector = null; - } - - if(_fdIntrWrite != null) - { - try - { - _fdIntrWrite.close(); - } - catch(java.io.IOException ex) - { - // - // BUGFIX: - // - // Ignore this exception. This shouldn't happen - // but for some reasons the close() call raises - // "java.io.IOException: No such file or - // directory" under Linux with JDK 1.4.2. - // - } - _fdIntrWrite = null; - } - - if(_fdIntrRead != null) - { - _fdIntrRead.close(); - _fdIntrRead = null; - } - } - catch(java.io.IOException ex) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "' while calling close():\n" + sw.toString(); - _instance.initializationData().logger.error(s); - } + _selector.destroy(); } public String @@ -346,91 +296,6 @@ public final class ThreadPool { return _prefix; } - - private void - clearInterrupt() - { - if(TRACE_INTERRUPT) - { - trace("clearInterrupt"); - if(TRACE_STACK_TRACE) - { - try - { - throw new RuntimeException(); - } - catch(RuntimeException ex) - { - ex.printStackTrace(); - } - } - } - - byte b = 0; - - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - try - { - while(true) - { - buf.rewind(); - if(_fdIntrRead.read(buf) != 1) - { - break; - } - - if(TRACE_INTERRUPT) - { - trace("clearInterrupt got byte " + (int)buf.get(0)); - } - - b = buf.get(0); - break; - } - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - - private void - setInterrupt() - { - if(TRACE_INTERRUPT) - { - trace("setInterrupt()"); - if(TRACE_STACK_TRACE) - { - try - { - throw new RuntimeException(); - } - catch(RuntimeException ex) - { - ex.printStackTrace(); - } - } - } - - java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1); - buf.put(0, (byte)0); - while(buf.hasRemaining()) - { - try - { - _fdIntrWrite.write(buf); - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - } // // Each thread supplies a BasicStream, to avoid creating excessive @@ -465,206 +330,85 @@ public final class ThreadPool while(true) { - if(TRACE_REGISTRATION) - { - java.util.Set<java.nio.channels.SelectionKey> keys = _selector.keys(); - trace("selecting on " + keys.size() + " channels:"); - java.util.Iterator<java.nio.channels.SelectionKey> i = keys.iterator(); - while(i.hasNext()) - { - java.nio.channels.SelectionKey key = i.next(); - trace(" " + key.channel()); - } - } - - EventHandler handler = null; - ThreadPoolWorkItem workItem = null; - - // - // Only call select() if there are no pending handlers with additional data - // for us to read. - // - if(!_pendingHandlers.isEmpty()) + try { - handler = _pendingHandlers.removeFirst(); + _selector.select(); } - else + catch(java.io.IOException ex) { - select(); + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + //throw se; + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + se.printStackTrace(pw); + pw.flush(); + String s = "exception in `" + _prefix + "':\n" + sw.toString(); + _instance.initializationData().logger.error(s); + continue; } + EventHandler handler = null; + ThreadPoolWorkItem workItem = null; boolean finished = false; boolean shutdown = false; - if(handler == null) + synchronized(this) { - synchronized(this) + if(_selector.checkTimeout()) + { + assert(_timeout > 0); + shutdown = true; + } + else if(_selector.isInterrupted()) { - if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout. + if(_selector.processInterrupt()) { - if(TRACE_SELECT) - { - trace("timeout"); - } - - assert(_timeout > 0); - _timeout = 0; - shutdown = true; + continue; + } + + // + // There are three possiblities for an interrupt: + // + // 1. The thread pool has been destroyed. + // + // 2. An event handler is being finished. + // + // 3. A work item has been scheduled. + // + + if(!_finished.isEmpty()) + { + _selector.clearInterrupt(); + handler = _finished.removeFirst(); + finished = true; + } + else if(!_workItems.isEmpty()) + { + // + // Work items must be executed first even if the thread pool is destroyed. + // + _selector.clearInterrupt(); + workItem = _workItems.removeFirst(); + } + else if(_destroyed) + { + // + // Don't clear the interrupt if destroyed, so that the other threads exit as well. + // + return true; } else { - if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable()) - { - if(TRACE_SELECT || TRACE_INTERRUPT) - { - trace("detected interrupt"); - } - - // - // There are three possiblities for an interrupt: - // - // 1. The thread pool has been destroyed. - // - // 2. An event handler was registered or unregistered. - // - // 3. A work item has been scheduled. - // - - if(!_workItems.isEmpty()) - { - // - // Work items must be executed first even if the thread pool is destroyed. - // - - // - // Remove the interrupt channel from the selected key set. - // - _keys.remove(_fdIntrReadKey); - clearInterrupt(); - assert(!_workItems.isEmpty()); - workItem = _workItems.removeFirst(); - } - else if(_destroyed) - { - if(TRACE_SHUTDOWN) - { - trace("destroyed, thread id = " + Thread.currentThread()); - } - - // - // Don't clear the interrupt fd if destroyed, so that the other threads exit as well. - // - return true; - } - else - { - // - // Remove the interrupt channel from the selected key set. - // - _keys.remove(_fdIntrReadKey); - clearInterrupt(); - - // - // An event handler must have been registered or unregistered. - // - assert(!_changes.isEmpty()); - FdHandlerPair change = _changes.removeFirst(); - - if(change.handler != null) // Addition if handler is set. - { - int op; - if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) - { - op = java.nio.channels.SelectionKey.OP_READ; - } - else - { - op = java.nio.channels.SelectionKey.OP_ACCEPT; - } - - java.nio.channels.SelectionKey key = null; - try - { - key = change.fd.register(_selector, op, change.handler); - } - catch(java.nio.channels.ClosedChannelException ex) - { - assert(false); - } - _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key)); - - // - // If the handler is readable and already has some data to read add it - // to the _pendingHandlers list to ensure it will be processed. - // - if(change.handler.readable() && change.handler.hasMoreData()) - { - _pendingHandlers.add(change.handler); - } - - if(TRACE_REGISTRATION) - { - trace("added handler (" + change.handler.getClass().getName() + ") for fd " + - change.fd); - } - - continue; - } - else // Removal if handler is not set. - { - HandlerKeyPair pair = _handlerMap.remove(change.fd); - assert(pair != null); - handler = pair.handler; - finished = true; - pair.key.cancel(); - - if(TRACE_REGISTRATION) - { - trace("removed handler (" + handler.getClass().getName() + ") for fd " + - change.fd); - } - - // Don't continue; we have to call - // finished() on the event handler below, - // outside the thread synchronization. - } - } - } - else - { - java.nio.channels.SelectionKey key = null; - java.util.Iterator<java.nio.channels.SelectionKey> iter = _keys.iterator(); - while(iter.hasNext()) - { - // - // Ignore selection keys that have been cancelled - // - java.nio.channels.SelectionKey k = iter.next(); - iter.remove(); - if(k.isValid() && k != _fdIntrReadKey) - { - if(TRACE_SELECT) - { - trace("found a key: " + keyToString(k)); - } - - key = k; - break; - } - } - - if(key == null) - { - if(TRACE_SELECT) - { - trace("didn't find a valid key"); - } - - continue; - } - - handler = (EventHandler)key.attachment(); - } + assert(false); + } + } + else + { + handler = (EventHandler)_selector.getNextSelected(); + if(handler == null) + { + continue; } } } @@ -675,11 +419,6 @@ public final class ThreadPool if(shutdown) { - if(TRACE_SHUTDOWN) - { - trace("shutdown detected"); - } - // // Initiate server shutdown. // @@ -693,7 +432,7 @@ public final class ThreadPool continue; } - promoteFollower(); + promoteFollower(null); factory.shutdown(); // @@ -732,8 +471,7 @@ public final class ThreadPool if(finished) { // - // Notify a handler about its removal from - // the thread pool. + // Notify a handler about its removal from the thread pool. // try { @@ -773,14 +511,10 @@ public final class ThreadPool { continue; // Can't read without blocking. } - - // - // If the handler has more data to process add it to the _pendingHandlers list - // to ensure it will be processed. - // + if(handler.hasMoreData()) { - _pendingHandlers.add(handler); + _selector.hasMoreData(handler); } } catch(Ice.TimeoutException ex) @@ -871,6 +605,15 @@ public final class ThreadPool { if(!_destroyed) { + if(_serialize && handler != null && handler._serializing) + { + if(handler._registered) + { + _selector.add(handler, SocketStatus.NeedRead); + } + handler._serializing = false; + } + // // First we reap threads that have been // destroyed before. @@ -1146,84 +889,6 @@ public final class ThreadPool */ private void - select() - { - int ret = 0; - int spuriousWakeUp = 0; - while(true) - { - try - { - if(TRACE_SELECT) - { - trace("select on " + _selector.keys().size() + " keys, thread id = " + Thread.currentThread()); - } - - if(_timeout > 0) - { - ret = _selector.select(_timeout * 1000); - } - else - { - ret = _selector.select(); - } - } - catch(java.io.IOException ex) - { - // - // Pressing Ctrl-C causes select() to raise an - // IOException, which seems like a JDK bug. We trap - // for that special case here and ignore it. - // Hopefully we're not masking something important! - // - if(Network.interrupted(ex)) - { - continue; - } - - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - //throw se; - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - se.printStackTrace(pw); - pw.flush(); - String s = "exception in `" + _prefix + "':\n" + sw.toString(); - _instance.initializationData().logger.error(s); - continue; - } - - if(TRACE_SELECT) - { - trace("select() returned " + ret + ", _keys.size() = " + _keys.size()); - } - - if(ret == 0 && _timeout <= 0) - { - // - // This is necessary to prevent a busy loop in case of a spurious wake-up which - // sometime occurs in the client thread pool when the communicator is destroyed. - // If there are too many successive spurious wake-ups, we log an error. - // - try - { - Thread.currentThread().sleep(1); - } - catch(java.lang.InterruptedException ex) - { - } - - if(++spuriousWakeUp > 100) - { - _instance.initializationData().logger.error("spurious selector wake up in `" + _prefix + "'"); - } - } - - break; - } - } - - private void trace(String msg) { System.err.println(_prefix + ": " + msg); @@ -1253,57 +918,15 @@ public final class ThreadPool return key.channel() + " " + ops; } - private static final class FdHandlerPair - { - java.nio.channels.SelectableChannel fd; - EventHandler handler; - - FdHandlerPair(java.nio.channels.SelectableChannel fd, EventHandler handler) - { - this.fd = fd; - this.handler = handler; - } - } - - private static final class HandlerKeyPair - { - EventHandler handler; - java.nio.channels.SelectionKey key; - - HandlerKeyPair(EventHandler handler, java.nio.channels.SelectionKey key) - { - this.handler = handler; - this.key = key; - } - } - private Instance _instance; private boolean _destroyed; private final String _prefix; private final String _programNamePrefix; - - private java.nio.channels.ReadableByteChannel _fdIntrRead; - private java.nio.channels.SelectionKey _fdIntrReadKey; - private java.nio.channels.WritableByteChannel _fdIntrWrite; - private java.nio.channels.Selector _selector; - private java.util.Set<java.nio.channels.SelectionKey> _keys; - - private java.util.LinkedList<FdHandlerPair> _changes = new java.util.LinkedList<FdHandlerPair>(); + private final Selector _selector; private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>(); - - private java.util.Map<java.nio.channels.SelectableChannel, HandlerKeyPair> _handlerMap = - new java.util.HashMap<java.nio.channels.SelectableChannel, HandlerKeyPair>(); - + private java.util.LinkedList<EventHandler> _finished = new java.util.LinkedList<EventHandler>(); private int _timeout; - // - // Since the Java5 SSL transceiver can read more data from the socket than is - // actually requested, we have to keep a separate list of handlers that need - // the thread pool to read more data before it re-enters a blocking call to - // select(). - // - private java.util.LinkedList<EventHandler> _pendingHandlers = new java.util.LinkedList<EventHandler>(); - private final class EventHandlerThread extends Thread { EventHandlerThread(String name) @@ -1377,6 +1000,7 @@ public final class ThreadPool private final int _size; // Number of threads that are pre-created. private final int _sizeMax; // Maximum number of threads. private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. + private final boolean _serialize; // True if requests need to be serialized over the connection. private java.util.List<EventHandlerThread> _threads; // All threads, running or not. private int _threadIndex; // For assigning thread names. diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java index 7bdd2369ba8..ff76dcc85e6 100644 --- a/java/src/IceInternal/Transceiver.java +++ b/java/src/IceInternal/Transceiver.java @@ -24,40 +24,28 @@ public interface Transceiver // socket is ready for reading or writing and until it returns // SocketStatus.Finished. // - SocketStatus initialize(int timeout); + SocketStatus initialize(); void close(); - void shutdownWrite(); - void shutdownReadWrite(); // // Write data. // - // Returns true if all the data was written, false otherwise. If - // timeout is -1, this operation will block until all the data is - // written. If timeout is 0, it will return when the write can't - // be completed without blocking. If the timeout is > 0, it will - // block until all the data is written or the specified timeout - // expires. + // Returns true if all the data was written, false otherwise. // - boolean write(Buffer buf, int timeout); + boolean write(Buffer buf); // // Read data. // // Returns true if all the requested data was read, false otherwise. - // If timeout is -1, this operation will block until all the data - // is read. If timeout is 0, it will return when the read can't be - // completed without blocking. If the timeout is > 0, it will - // block until all the data is read or the specified timeout - // expires. // // NOTE: In Java, read() returns a boolean in moreData to indicate // whether the transceiver has read more data than requested. // If moreData is true, read should be called again without // calling select on the FD. // - boolean read(Buffer buf, int timeout, Ice.BooleanHolder moreData); + boolean read(Buffer buf, Ice.BooleanHolder moreData); String type(); String toString(); diff --git a/java/src/IceInternal/UdpConnector.java b/java/src/IceInternal/UdpConnector.java index f9c5b6a7a52..3fbad48782b 100644 --- a/java/src/IceInternal/UdpConnector.java +++ b/java/src/IceInternal/UdpConnector.java @@ -12,7 +12,7 @@ package IceInternal; final class UdpConnector implements Connector, java.lang.Comparable { public Transceiver - connect(int timeout) + connect() { return new UdpTransceiver(_instance, _addr, _mcastInterface, _mcastTtl); } diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index 62000f84b37..1410829c30c 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -19,7 +19,7 @@ final class UdpTransceiver implements Transceiver } public SocketStatus - initialize(int timeout) + initialize() { // // Nothing to do. @@ -27,73 +27,29 @@ final class UdpTransceiver implements Transceiver return SocketStatus.Finished; } - public synchronized void + public void close() { - // - // NOTE: closeSocket() may have already been invoked by shutdownReadWrite(). - // - closeSocket(); - - if(_readSelector != null) - { - try - { - _readSelector.close(); - } - catch(java.io.IOException ex) - { - // Ignore. - } - _readSelector = null; - } - - if(_writeSelector != null) + assert(_fd != null); + + if(_traceLevels.network >= 1) { - try - { - _writeSelector.close(); - } - catch(java.io.IOException ex) - { - // Ignore. - } - _writeSelector = null; + String s = "closing udp connection\n" + toString(); + _logger.trace(_traceLevels.networkCat, s); } - } - - public void - shutdownWrite() - { - // - // NOTE: DatagramSocket does not support shutdownOutput. - // - } - - public synchronized void - shutdownReadWrite() - { - // - // NOTE: DatagramSocket does not support shutdownInput, and we - // cannot use the C++ technique of sending a "wakeup" packet to - // this socket because the Java implementation deadlocks when we - // call disconnect() while receive() is in progress. Therefore - // we close the socket here and wake up the selector. - // - closeSocket(); - - if(_readSelector != null) + + try { - _readSelector.wakeup(); + _fd.close(); } - if(_writeSelector != null) + catch(java.io.IOException ex) { - _writeSelector.wakeup(); } + _fd = null; } public boolean - write(Buffer buf, int timeout) + write(Buffer buf) { assert(buf.b.position() == 0); final int packetSize = java.lang.Math.min(_maxPacketSize, _sndSize - _udpOverhead); @@ -115,41 +71,7 @@ final class UdpTransceiver implements Transceiver if(ret == 0) { - if(timeout == 0) - { - return false; - } - - synchronized(this) - { - if(_writeSelector == null) - { - _writeSelector = java.nio.channels.Selector.open(); - _fd.register(_writeSelector, java.nio.channels.SelectionKey.OP_WRITE, null); - } - } - - try - { - if(timeout > 0) - { - long start = IceInternal.Time.currentMonotonicTimeMillis(); - int n = _writeSelector.select(timeout); - if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout) - { - throw new Ice.TimeoutException(); - } - } - else - { - _writeSelector.select(); - } - } - catch(java.io.InterruptedIOException ex) - { - // Ignore. - } - continue; + return false; } if(_traceLevels.network >= 3) @@ -194,7 +116,7 @@ final class UdpTransceiver implements Transceiver } public boolean - read(Buffer buf, int timeout, Ice.BooleanHolder moreData) + read(Buffer buf, Ice.BooleanHolder moreData) { assert(buf.b.position() == 0); moreData.value = false; @@ -218,60 +140,13 @@ final class UdpTransceiver implements Transceiver int ret = 0; while(true) { - // - // Check for shutdown. - // - java.nio.channels.DatagramChannel fd = null; - synchronized(this) - { - if(_fd == null) - { - throw new Ice.ConnectionLostException(); - } - fd = _fd; - } - try { - java.net.InetSocketAddress sender = (java.net.InetSocketAddress)fd.receive(buf.b); + java.net.InetSocketAddress sender = (java.net.InetSocketAddress)_fd.receive(buf.b); if(sender == null || buf.b.position() == 0) { - if(timeout == 0) - { - return false; - } - - synchronized(this) - { - if(_readSelector == null) - { - _readSelector = java.nio.channels.Selector.open(); - _fd.register(_readSelector, java.nio.channels.SelectionKey.OP_READ, null); - } - } - - try - { - if(timeout > 0) - { - long start = IceInternal.Time.currentMonotonicTimeMillis(); - int n = _readSelector.select(timeout); - if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout) - { - throw new Ice.TimeoutException(); - } - } - else - { - _readSelector.select(); - } - } - catch(java.io.InterruptedIOException ex) - { - // Ignore. - } - continue; + return false; } ret = buf.b.position(); @@ -282,7 +157,7 @@ final class UdpTransceiver implements Transceiver // If we must connect, then we connect to the first peer that // sends us a packet. // - Network.doConnect(fd, sender, -1); + Network.doConnect(_fd, sender); _connect = false; // We're connected now if(_traceLevels.network >= 1) @@ -400,7 +275,7 @@ final class UdpTransceiver implements Transceiver _fd = Network.createUdpSocket(); setBufSize(instance); Network.setBlock(_fd, false); - Network.doConnect(_fd, _addr, -1); + Network.doConnect(_fd, _addr); _connect = false; // We're connected now if(_addr.getAddress().isMulticastAddress()) { @@ -633,23 +508,6 @@ final class UdpTransceiver implements Transceiver private void closeSocket() { - if(_fd != null) - { - if(_traceLevels.network >= 1) - { - String s = "closing udp connection\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); - } - - try - { - _fd.close(); - } - catch(java.io.IOException ex) - { - } - _fd = null; - } } protected synchronized void @@ -671,8 +529,6 @@ final class UdpTransceiver implements Transceiver private int _sndSize; private java.nio.channels.DatagramChannel _fd; private java.net.InetSocketAddress _addr; - private java.nio.channels.Selector _readSelector; - private java.nio.channels.Selector _writeSelector; private boolean mcastServer = false; // diff --git a/java/src/IceInternal/UnknownEndpointI.java b/java/src/IceInternal/UnknownEndpointI.java index 6e940af8ed7..63cff5902ab 100644 --- a/java/src/IceInternal/UnknownEndpointI.java +++ b/java/src/IceInternal/UnknownEndpointI.java @@ -360,12 +360,6 @@ final class UnknownEndpointI extends EndpointI return 0; } - public boolean - requiresThreadPerConnection() - { - return false; - } - private void calcHashValue() { diff --git a/java/src/IceSSL/AcceptorI.java b/java/src/IceSSL/AcceptorI.java index de5be3781a3..aadf7316c50 100644 --- a/java/src/IceSSL/AcceptorI.java +++ b/java/src/IceSSL/AcceptorI.java @@ -26,30 +26,9 @@ final class AcceptorI implements IceInternal.Acceptor _logger.trace(_instance.networkTraceCategory(), s); } - java.nio.channels.ServerSocketChannel fd; - java.nio.channels.Selector selector; - synchronized(this) - { - fd = _fd; - selector = _selector; - _fd = null; - _selector = null; - } - if(fd != null) - { - IceInternal.Network.closeSocketNoThrow(fd); - } - if(selector != null) - { - try - { - selector.close(); - } - catch(java.io.IOException ex) - { - // Ignore. - } - } + assert(_fd != null); + IceInternal.Network.closeSocketNoThrow(_fd); + _fd = null; } public void @@ -65,7 +44,7 @@ final class AcceptorI implements IceInternal.Acceptor } public IceInternal.Transceiver - accept(int timeout) + accept() { // // The plugin may not be fully initialized. @@ -83,52 +62,6 @@ final class AcceptorI implements IceInternal.Acceptor try { fd = _fd.accept(); - if(fd == null) - { - if(_selector == null) - { - _selector = java.nio.channels.Selector.open(); - } - - while(true) - { - try - { - java.nio.channels.SelectionKey key = - _fd.register(_selector, java.nio.channels.SelectionKey.OP_ACCEPT); - if(timeout > 0) - { - if(_selector.select(timeout) == 0) - { - throw new Ice.TimeoutException(); - } - } - else if(timeout == 0) - { - if(_selector.selectNow() == 0) - { - throw new Ice.TimeoutException(); - } - } - else - { - _selector.select(); - } - - break; - } - catch(java.io.IOException ex) - { - if(IceInternal.Network.interrupted(ex)) - { - continue; - } - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - } } catch(java.io.IOException ex) { @@ -178,15 +111,6 @@ final class AcceptorI implements IceInternal.Acceptor return new TransceiverI(_instance, engine, fd, "", true, true, _adapterName); } - public void - connectToSelf() - { - java.nio.channels.SocketChannel fd = IceInternal.Network.createTcpSocket(); - IceInternal.Network.setBlock(fd, false); - IceInternal.Network.doConnect(fd, _addr, -1); - IceInternal.Network.closeSocketNoThrow(fd); - } - public String toString() { @@ -258,5 +182,4 @@ final class AcceptorI implements IceInternal.Acceptor private java.nio.channels.ServerSocketChannel _fd; private int _backlog; private java.net.InetSocketAddress _addr; - private java.nio.channels.Selector _selector; } diff --git a/java/src/IceSSL/ConnectorI.java b/java/src/IceSSL/ConnectorI.java index 91f170685b0..370237563e8 100644 --- a/java/src/IceSSL/ConnectorI.java +++ b/java/src/IceSSL/ConnectorI.java @@ -12,7 +12,7 @@ package IceSSL; final class ConnectorI implements IceInternal.Connector, java.lang.Comparable { public IceInternal.Transceiver - connect(int timeout) + connect() { // // The plugin may not be fully initialized. @@ -35,7 +35,7 @@ final class ConnectorI implements IceInternal.Connector, java.lang.Comparable java.nio.channels.SocketChannel fd = IceInternal.Network.createTcpSocket(); IceInternal.Network.setBlock(fd, false); IceInternal.Network.setTcpBufSize(fd, _instance.communicator().getProperties(), _logger); - boolean connected = IceInternal.Network.doConnect(fd, _addr, timeout); + boolean connected = IceInternal.Network.doConnect(fd, _addr); try { javax.net.ssl.SSLEngine engine = _instance.createSSLEngine(false); diff --git a/java/src/IceSSL/TransceiverI.java b/java/src/IceSSL/TransceiverI.java index eb983f4b59a..3ef8a5c63d7 100644 --- a/java/src/IceSSL/TransceiverI.java +++ b/java/src/IceSSL/TransceiverI.java @@ -22,42 +22,29 @@ final class TransceiverI implements IceInternal.Transceiver return _fd; } - // - // All methods that can write to the socket are synchronized. - // - public synchronized IceInternal.SocketStatus - initialize(int timeout) + public IceInternal.SocketStatus + initialize() { try { - if(_state == StateNeedConnect && timeout == 0) + if(_state == StateNeedConnect) { _state = StateConnectPending; return IceInternal.SocketStatus.NeedConnect; } else if(_state <= StateConnectPending) { - IceInternal.Network.doFinishConnect(_fd, timeout); + IceInternal.Network.doFinishConnect(_fd); _state = StateConnected; _desc = IceInternal.Network.fdToString(_fd); } assert(_state == StateConnected); - IceInternal.SocketStatus status; - do + IceInternal.SocketStatus status = handshakeNonBlocking(); + if(status != IceInternal.SocketStatus.Finished) { - status = handshakeNonBlocking(); - if(timeout == 0) - { - return status; - } - - if(status != IceInternal.SocketStatus.Finished) - { - handleSocketStatus(status, timeout); - } + return status; } - while(status != IceInternal.SocketStatus.Finished); } catch(Ice.LocalException ex) { @@ -72,10 +59,6 @@ final class TransceiverI implements IceInternal.Transceiver return IceInternal.SocketStatus.Finished; } - // - // All methods that can write to the socket are synchronized. - // - public void close() { @@ -87,47 +70,60 @@ final class TransceiverI implements IceInternal.Transceiver assert(_fd != null); - if(_readSelector != null) + if(_state >= StateConnected) { try { - _readSelector.close(); + // + // Send the close_notify message. + // + _engine.closeOutbound(); + _netOutput.clear(); + while(!_engine.isOutboundDone()) + { + _engine.wrap(_emptyBuffer, _netOutput); + try + { + // + // Note: we can't block to send the close_notify message. In some cases, the + // close_notify message might therefore not be receieved by the peer. This is + // not a big issue since the Ice protocol isn't subject to truncation attacks. + // + flushNonBlocking(); + } + catch(Ice.LocalException ex) + { + // Ignore. + } + } } - catch(java.io.IOException ex) + catch(SSLException ex) { - // Ignore. + // + // We can't throw in close. + // + // Ice.SecurityException se = new Ice.SecurityException(); + // se.reason = "IceSSL: SSL failure while shutting down socket"; + // se.initCause(ex); + // throw se; } - _readSelector = null; - } - if(_writeSelector != null) - { try { - _writeSelector.close(); + _engine.closeInbound(); } - catch(java.io.IOException ex) + catch(SSLException ex) { - // Ignore. + // + // SSLEngine always raises an exception with this message: + // + // Inbound closed before receiving peer's close_notify: possible truncation attack? + // + // We would probably need to wait for a response in shutdown() to avoid this. + // For now, we'll ignore this exception. + // + //_logger.error("IceSSL: error during close\n" + ex.getMessage()); } - _writeSelector = null; - } - - try - { - _engine.closeInbound(); - } - catch(SSLException ex) - { - // - // SSLEngine always raises an exception with this message: - // - // Inbound closed before receiving peer's close_notify: possible truncation attack? - // - // We would probably need to wait for a response in shutdown() to avoid this. - // For now, we'll ignore this exception. - // - //_logger.error("IceSSL: error during close\n" + ex.getMessage()); } try @@ -140,104 +136,8 @@ final class TransceiverI implements IceInternal.Transceiver } } - // - // All methods that can write to the socket are synchronized. - // - public synchronized void - shutdownWrite() - { - if(_state < StateConnected) - { - return; - } - - if(_instance.networkTraceLevel() >= 2) - { - String s = "shutting down ssl connection for writing\n" + toString(); - _logger.trace(_instance.networkTraceCategory(), s); - } - - shutdown(); - - assert(_fd != null); - java.net.Socket socket = _fd.socket(); - try - { - socket.shutdownOutput(); // Shutdown socket for writing. - } - catch(java.net.SocketException ex) - { - // - // Ignore. We can't reliably figure out if the socket - // exception is because the socket is not connected. - // - // if(!IceInternal.Network.notConnected(ex)) - // { - // Ice.SocketException se = new Ice.SocketException(); - // se.initCause(ex); - // throw se; - // } - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - - // - // All methods that can write to the socket are synchronized. - // - public synchronized void - shutdownReadWrite() - { - if(_state < StateConnected) - { - return; - } - - if(_instance.networkTraceLevel() >= 2) - { - String s = "shutting down ssl connection for reading and writing\n" + toString(); - _logger.trace(_instance.networkTraceCategory(), s); - } - - shutdown(); - - assert(_fd != null); - java.net.Socket socket = _fd.socket(); - try - { - socket.shutdownInput(); // Shutdown socket for reading - socket.shutdownOutput(); // Shutdown socket for writing - } - catch(java.net.SocketException ex) - { - // - // Ignore. We can't reliably figure out if the socket - // exception is because the socket is not connected. - // - // if(!IceInternal.Network.notConnected(ex)) - // { - // Ice.SocketException se = new Ice.SocketException(); - // se.initCause(ex); - // throw se; - // } - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - - // - // All methods that can write to the socket are synchronized. - // - public synchronized boolean - write(IceInternal.Buffer buf, int timeout) + public boolean + write(IceInternal.Buffer buf) { // // If the handshake isn't completed yet, we shouldn't be writing. @@ -247,28 +147,17 @@ final class TransceiverI implements IceInternal.Transceiver throw new Ice.ConnectionLostException(); } - IceInternal.SocketStatus status; - do + IceInternal.SocketStatus status = writeNonBlocking(buf.b); + if(status != IceInternal.SocketStatus.Finished) { - status = writeNonBlocking(buf.b); - if(status != IceInternal.SocketStatus.Finished) - { - if(timeout == 0) - { - assert(status == IceInternal.SocketStatus.NeedWrite); - return false; - } - - handleSocketStatus(status, timeout); - } - } - while(status != IceInternal.SocketStatus.Finished); - + assert(status == IceInternal.SocketStatus.NeedWrite); + return false; + } return true; } public boolean - read(IceInternal.Buffer buf, int timeout, Ice.BooleanHolder moreData) + read(IceInternal.Buffer buf, Ice.BooleanHolder moreData) { // // If the handshake isn't completed yet, we shouldn't be reading (read can be @@ -324,23 +213,13 @@ final class TransceiverI implements IceInternal.Transceiver } case BUFFER_UNDERFLOW: { - IceInternal.SocketStatus status; - do + IceInternal.SocketStatus status = readNonBlocking(); + if(status != IceInternal.SocketStatus.Finished) { - status = readNonBlocking(); - if(status != IceInternal.SocketStatus.Finished) - { - if(timeout == 0) - { - assert(status == IceInternal.SocketStatus.NeedRead); - moreData.value = false; - return false; - } - - handleSocketStatus(status, timeout); - } - } - while(status != IceInternal.SocketStatus.Finished); + assert(status == IceInternal.SocketStatus.NeedRead); + moreData.value = false; + return false; + } continue; } case CLOSED: @@ -639,50 +518,6 @@ final class TransceiverI implements IceInternal.Transceiver private void shutdown() { - // - // Send the close_notify message. - // - _engine.closeOutbound(); - try - { - _netOutput.clear(); - while(!_engine.isOutboundDone()) - { - _engine.wrap(_emptyBuffer, _netOutput); - try - { - // - // We can't block to send the close_notify message as this is called from - // shutdownWrite and shutdownReadWrite which aren't suppose to block. In - // some cases, the close_notify message might therefore not be receieved - // by the peer. This is not a big issue since the Ice protocol isn't - // subject to truncation attacks. - // -// IceInternal.SocketStatus status; -// do -// { -// status = flushNonBlocking(); -// if(status != IceInternal.SocketStatus.Finished) -// { -// handleSocketStatus(status, -1); // TODO: Is waiting indefinitely really correct? -// } -// } -// while(status != IceInternal.SocketStatus.Finished); - flushNonBlocking(); - } - catch(Ice.ConnectionLostException ex) - { - // Ignore. - } - } - } - catch(SSLException ex) - { - Ice.SecurityException se = new Ice.SecurityException(); - se.reason = "IceSSL: SSL failure while shutting down socket"; - se.initCause(ex); - throw se; - } } private IceInternal.SocketStatus @@ -932,74 +767,6 @@ final class TransceiverI implements IceInternal.Transceiver _appInput.compact(); } - private void - handleSocketStatus(IceInternal.SocketStatus status, int timeout) - { - assert(timeout != 0); - try - { - java.nio.channels.Selector selector; - if(status == IceInternal.SocketStatus.NeedRead) - { - if(_readSelector == null) - { - _readSelector = java.nio.channels.Selector.open(); - _fd.register(_readSelector, java.nio.channels.SelectionKey.OP_READ, null); - } - selector = _readSelector; - } - else - { - assert(status == IceInternal.SocketStatus.NeedWrite); - if(_writeSelector == null) - { - _writeSelector = java.nio.channels.Selector.open(); - _fd.register(_writeSelector, java.nio.channels.SelectionKey.OP_WRITE, null); - } - selector = _writeSelector; - } - - while(true) - { - try - { - if(timeout > 0) - { - long start = System.currentTimeMillis(); - int n = selector.select(timeout); - if(n == 0 && System.currentTimeMillis() >= start + timeout) - { - throw new Ice.TimeoutException(); - } - } - else - { - selector.select(); - } - - break; - } - catch(java.io.InterruptedIOException ex) - { - // Ignore. - } - } - } - catch(java.io.IOException ex) - { - if(IceInternal.Network.connectionLost(ex)) - { - Ice.ConnectionLostException se = new Ice.ConnectionLostException(); - se.initCause(ex); - throw se; - } - - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - private Instance _instance; private java.nio.channels.SocketChannel _fd; private javax.net.ssl.SSLEngine _engine; @@ -1015,8 +782,6 @@ final class TransceiverI implements IceInternal.Transceiver private ByteBuffer _netInput; // Holds encrypted data read from the socket. private ByteBuffer _netOutput; // Holds encrypted data to be written to the socket. private static ByteBuffer _emptyBuffer = ByteBuffer.allocate(0); // Used during handshaking. - private java.nio.channels.Selector _readSelector; - private java.nio.channels.Selector _writeSelector; private ConnectionInfo _info; private static final int StateNeedConnect = 0; |