diff options
author | Mark Spruiell <mes@zeroc.com> | 2004-12-08 18:59:41 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2004-12-08 18:59:41 +0000 |
commit | d821c757210e7f42bf78fdcd6699464786546654 (patch) | |
tree | 9ab20a747e779dfceeab9e4f896f17de6ad1ea43 /java/src | |
parent | Added regular expression matching to property checking code. (diff) | |
download | ice-d821c757210e7f42bf78fdcd6699464786546654.tar.bz2 ice-d821c757210e7f42bf78fdcd6699464786546654.tar.xz ice-d821c757210e7f42bf78fdcd6699464786546654.zip |
adding thread-per-connection
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 1103 | ||||
-rw-r--r-- | java/src/IceInternal/Acceptor.java | 1 | ||||
-rw-r--r-- | java/src/IceInternal/IncomingConnectionFactory.java | 378 | ||||
-rw-r--r-- | java/src/IceInternal/Instance.java | 29 | ||||
-rw-r--r-- | java/src/IceInternal/Network.java | 15 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingConnectionFactory.java | 15 | ||||
-rw-r--r-- | java/src/IceInternal/TcpAcceptor.java | 9 | ||||
-rw-r--r-- | java/src/IceInternal/TcpTransceiver.java | 36 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 6 | ||||
-rw-r--r-- | java/src/IceInternal/Transceiver.java | 3 | ||||
-rw-r--r-- | java/src/IceInternal/UdpTransceiver.java | 20 |
11 files changed, 1197 insertions, 418 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index f02222732f6..c8b82685d6e 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -27,6 +27,34 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public synchronized void validate() { + if(_instance.threadPerConnection() && _threadPerConnection != Thread.currentThread()) + { + // + // In thread per connection mode, this connection's thread + // will take care of connection validation. Therefore all we + // have to do here is to wait until this thread has completed + // validation. + // + while(_state == StateNotValidated) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + if(_state >= StateClosing) + { + assert(_exception != null); + throw _exception; + } + + return; + } + assert(_state == StateNotValidated); if(!_endpoint.datagram()) // Datagram connections are always implicitly validated. @@ -42,10 +70,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // with respect to connection validation. // IceInternal.BasicStream os = new IceInternal.BasicStream(_instance); - os.writeByte(IceInternal.Protocol.magic[0]); - os.writeByte(IceInternal.Protocol.magic[1]); - os.writeByte(IceInternal.Protocol.magic[2]); - os.writeByte(IceInternal.Protocol.magic[3]); + os.writeBlob(IceInternal.Protocol.magic); os.writeByte(IceInternal.Protocol.protocolMajor); os.writeByte(IceInternal.Protocol.protocolMinor); os.writeByte(IceInternal.Protocol.encodingMajor); @@ -70,13 +95,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _transceiver.read(is, _endpoint.timeout()); assert(is.pos() == IceInternal.Protocol.headerSize); is.pos(0); - byte[] m = new byte[4]; - m[0] = is.readByte(); - m[1] = is.readByte(); - m[2] = is.readByte(); - m[3] = is.readByte(); - if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] - || m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) + byte[] m = is.readBlob(4); + if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || + m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) { BadMagicException ex = new BadMagicException(); ex.badMagic = m; @@ -186,22 +207,31 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } public synchronized boolean - isValidated() - { - return _state > StateNotValidated; - } - - public synchronized boolean isDestroyed() { return _state >= StateClosing; } - public synchronized boolean + public boolean isFinished() { - if(_transceiver == null && _dispatchCount == 0) + Thread threadPerConnection = null; + + synchronized(this) { + if(_transceiver != null || _dispatchCount != 0 || + (_threadPerConnection != null && + _threadPerConnection != Thread.currentThread() && + _threadPerConnection.isAlive())) + { + return false; + } + + assert(_state == StateClosed); + + threadPerConnection = _threadPerConnection; + _threadPerConnection = null; + // // We must destroy the incoming cache. It is now not // needed anymore. @@ -214,13 +244,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _incomingCache = _incomingCache.next; } } - - return true; } - else + + if(threadPerConnection != null && threadPerConnection != Thread.currentThread()) { - return false; + try + { + threadPerConnection.join(); + } + catch(InterruptedException ex) + { + } } + + return true; } public synchronized void @@ -238,88 +275,107 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - public synchronized void + public void waitUntilFinished() { - // - // We wait indefinitely until connection closing has been - // initiated. We also wait indefinitely until all outstanding - // requests are completed. Otherwise we couldn't guarantee - // that there are no outstanding calls when deactivate() is - // called on the servant locators. - // - while(_state < StateClosing || _dispatchCount > 0) + Thread threadPerConnection = null; + + synchronized(this) { - try - { - wait(); - } - catch(InterruptedException ex) + // + // We wait indefinitely until connection closing has been + // initiated. We also wait indefinitely until all outstanding + // requests are completed. Otherwise we couldn't guarantee + // that there are no outstanding calls when deactivate() is + // called on the servant locators. + // + while(_state < StateClosing || _dispatchCount > 0) { + try + { + wait(); + } + catch(InterruptedException ex) + { + } } - } - - // - // Now we must wait until close() has been called on the - // transceiver. - // - while(_transceiver != null) - { - try + + // + // Now we must wait until close() has been called on the + // transceiver. + // + while(_transceiver != null) { - if(_state != StateClosed && _endpoint.timeout() >= 0) + try { - long absoluteWaitTime = _stateTime + _endpoint.timeout(); - long waitTime = absoluteWaitTime - System.currentTimeMillis(); - - if(waitTime > 0) + if(_state != StateClosed && _endpoint.timeout() >= 0) { - // - // We must wait a bit longer until we close - // this connection. - // - wait(waitTime); - if(System.currentTimeMillis() >= absoluteWaitTime) + long absoluteWaitTime = _stateTime + _endpoint.timeout(); + long waitTime = absoluteWaitTime - System.currentTimeMillis(); + + if(waitTime > 0) + { + // + // We must wait a bit longer until we close this + // connection. + // + wait(waitTime); + if(System.currentTimeMillis() >= absoluteWaitTime) + { + setState(StateClosed, new CloseTimeoutException()); + } + } + else { + // + // We already waited long enough, so let's close this + // connection! + // setState(StateClosed, new CloseTimeoutException()); } + + // + // No return here, we must still wait until + // close() is called on the _transceiver. + // } else { - // - // We already waited long enough, so let's - // close this connection! - // - setState(StateClosed, new CloseTimeoutException()); + wait(); } - - // - // No return here, we must still wait until - // close() is called on the _transceiver. - // } - else + catch(InterruptedException ex) { - wait(); } } - catch(InterruptedException ex) + + assert(_state == StateClosed); + + threadPerConnection = _threadPerConnection; + _threadPerConnection = null; + + // + // We must destroy the incoming cache. It is now not + // needed anymore. + // + synchronized(_incomingCacheMutex) { + while(_incomingCache != null) + { + _incomingCache.__destroy(); + _incomingCache = _incomingCache.next; + } } } - assert(_state == StateClosed); - - // - // We must destroy the incoming cache. It is now not - // needed anymore. - // - synchronized(_incomingCacheMutex) + if(threadPerConnection != null && threadPerConnection != Thread.currentThread()) { - while(_incomingCache != null) + try + { + threadPerConnection.join(); + } + catch(InterruptedException ex) { - _incomingCache.__destroy(); - _incomingCache = _incomingCache.next; } } } @@ -388,56 +444,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeBlob(_requestHdr); } - private IceInternal.BasicStream - doCompress(IceInternal.BasicStream uncompressed, boolean compress) - { - if(_compressionSupported) - { - if(compress && uncompressed.size() >= 100) - { - // - // Do compression. - // - IceInternal.BasicStream cstream = uncompressed.compress(IceInternal.Protocol.headerSize); - if(cstream != null) - { - // - // Set compression status. - // - cstream.pos(9); - cstream.writeByte((byte)2); - - // - // Write the size of the compressed stream into the header. - // - cstream.pos(10); - cstream.writeInt(cstream.size()); - - // - // Write the compression status and size of the compressed stream into the header of the - // uncompressed stream -- we need this to trace requests correctly. - // - uncompressed.pos(9); - uncompressed.writeByte((byte)2); - uncompressed.writeInt(cstream.size()); - - return cstream; - } - } - } - - uncompressed.pos(9); - uncompressed.writeByte((byte)((_compressionSupported && compress) ? 1 : 0)); - - // - // Not compressed, fill in the message size. - // - uncompressed.pos(10); - uncompressed.writeInt(uncompressed.size()); - - return uncompressed; - } - public void sendRequest(IceInternal.BasicStream os, IceInternal.Outgoing out, boolean compress) { @@ -1046,18 +1052,22 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public boolean datagram() { - return _endpoint.datagram(); + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. + return _endpoint.datagram(); // No mutex protection necessary, _endpoint is immutable. } public boolean readable() { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. return true; } public void read(IceInternal.BasicStream stream) { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. + _transceiver.read(stream, 0); // @@ -1086,13 +1096,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void message(IceInternal.BasicStream stream, IceInternal.ThreadPool threadPool) { - byte compress = 0; - int requestId = 0; - int invokeNum = 0; - IceInternal.ServantManager servantManager = null; - ObjectAdapter adapter = null; - IceInternal.OutgoingAsync outAsync = null; - boolean destroyStream = false; + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. + + MessageInfo info = new MessageInfo(stream); synchronized(this) { @@ -1103,156 +1109,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // threadPool.promoteFollower(); - assert(_state > StateNotValidated); - - if(_state == StateClosed) + if(_state != StateClosed) { - return; + parseMessage(info); } - if(_acmTimeout > 0) + // + // parseMessage() can close the connection, so we must check + // for closed state again. + // + if(_state == StateClosed) { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + return; } - - try - { - // - // We don't need to check magic and version here. This - // has already been done by the ThreadPool, which - // provides us the stream. - // - assert(stream.pos() == stream.size()); - stream.pos(8); - byte messageType = stream.readByte(); - compress = stream.readByte(); - if(compress == (byte)2) - { - if(_compressionSupported) - { - IceInternal.BasicStream ustream = stream.uncompress(IceInternal.Protocol.headerSize); - if(ustream != stream) - { - destroyStream = true; - stream = ustream; - } - } - else - { - throw new CompressionNotSupportedException(); - } - } - stream.pos(IceInternal.Protocol.headerSize); - - switch(messageType) - { - case IceInternal.Protocol.closeConnectionMsg: - { - IceInternal.TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels); - if(_endpoint.datagram() && _warn) - { - _logger.warning("ignoring close connection message for datagram connection:\n" + _desc); - } - else - { - setState(StateClosed, new CloseConnectionException()); - } - break; - } - - case IceInternal.Protocol.requestMsg: - { - if(_state == StateClosing) - { - IceInternal.TraceUtil.traceRequest("received request during closing\n" + - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); - } - else - { - IceInternal.TraceUtil.traceRequest("received request", stream, _logger, _traceLevels); - requestId = stream.readInt(); - invokeNum = 1; - servantManager = _servantManager; - adapter = _adapter; - ++_dispatchCount; - } - break; - } - - case IceInternal.Protocol.requestBatchMsg: - { - if(_state == StateClosing) - { - IceInternal.TraceUtil.traceBatchRequest("received batch request during closing\n" + - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); - } - else - { - IceInternal.TraceUtil.traceBatchRequest("received batch request", stream, _logger, - _traceLevels); - invokeNum = stream.readInt(); - if(invokeNum < 0) - { - throw new NegativeSizeException(); - } - servantManager = _servantManager; - adapter = _adapter; - _dispatchCount += invokeNum; - } - break; - } - - case IceInternal.Protocol.replyMsg: - { - IceInternal.TraceUtil.traceReply("received reply", stream, _logger, _traceLevels); - requestId = stream.readInt(); - IceInternal.Outgoing out = (IceInternal.Outgoing)_requests.remove(requestId); - if(out != null) - { - out.finished(stream); - } - else - { - outAsync = (IceInternal.OutgoingAsync)_asyncRequests.remove(requestId); - if(outAsync == null) - { - throw new UnknownRequestIdException(); - } - } - break; - } - - case IceInternal.Protocol.validateConnectionMsg: - { - IceInternal.TraceUtil.traceHeader("received validate connection", stream, _logger, - _traceLevels); - if(_warn) - { - _logger.warning("ignoring unexpected validate connection message:\n" + _desc); - } - break; - } - - default: - { - IceInternal.TraceUtil.traceHeader("received unknown message\n" + - "(invalid, closing connection)", stream, _logger, - _traceLevels); - throw new UnknownMessageException(); - } - } - } - catch(LocalException ex) - { - if(destroyStream) - { - stream.destroy(); - } - setState(StateClosed, ex); - return; - } } try @@ -1261,9 +1130,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Asynchronous replies must be handled outside the thread // synchronization, so that nested calls are possible. // - if(outAsync != null) + if(info.outAsync != null) { - outAsync.__finished(stream); + info.outAsync.__finished(stream); } // @@ -1271,13 +1140,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // must be done outside the thread synchronization, so that nested // calls are possible. // - invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); + invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter); } finally { - if(destroyStream) + if(info.destroyStream) { - stream.destroy(); + info.stream.destroy(); } } } @@ -1285,6 +1154,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void finished(IceInternal.ThreadPool threadPool) { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. + threadPool.promoteFollower(); LocalException exception = null; @@ -1421,15 +1292,60 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_adapter != null) { - _threadPool = ((ObjectAdapterI)_adapter).getThreadPool(); _servantManager = ((ObjectAdapterI)_adapter).getServantManager(); } else { - _threadPool = _instance.clientThreadPool(); _servantManager = null; } + if(!_instance.threadPerConnection()) + { + // + // Only set _threadPool if we really need it, i.e., if we are + // not in thread per connection mode. Thread pools have lazy + // initialization in Instance, and we don't want them to be + // created if they are not needed. + // + if(_adapter != null) + { + _threadPool = ((ObjectAdapterI)_adapter).getThreadPool(); + } + else + { + _threadPool = _instance.clientThreadPool(); + } + } + else + { + _threadPool = null; + + // + // If we are in thread per connection mode, create the thread + // for this connection. + // + try + { + _threadPerConnection = new ThreadPerConnection(); + _threadPerConnection.start(); + } + catch(RuntimeException ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "cannot create thread for connection:\n" + sw.toString(); + _instance.logger().error(s); + + _state = StateClosed; + _transceiver = null; + _threadPerConnection = null; + + throw ex; + } + } + _overrideCompress = _instance.defaultsAndOverrides().overrideCompress; _overrideCompressValue = _instance.defaultsAndOverrides().overrideCompressValue; } @@ -1441,6 +1357,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_state == StateClosed); assert(_transceiver == null); assert(_dispatchCount == 0); + assert(_threadPerConnection == null); assert(_incomingCache == null); _batchStream.destroy(); @@ -1537,7 +1454,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { return; } - registerWithPool(); + if(!_instance.threadPerConnection()) + { + registerWithPool(); + } break; } @@ -1551,7 +1471,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { return; } - unregisterWithPool(); + if(!_instance.threadPerConnection()) + { + unregisterWithPool(); + } break; } @@ -1564,25 +1487,36 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { return; } - registerWithPool(); // We need to continue to read in closing state. + if(!_instance.threadPerConnection()) + { + registerWithPool(); // We need to continue to read in closing state. + } break; } case StateClosed: { - // - // If we change from not validated, we can close right - // away. Otherwise we first must make sure that we are - // registered, then we unregister, and let finished() - // do the close. - // - if(_state == StateNotValidated) + if(_instance.threadPerConnection()) + { + // + // If we are in thread per connection mode, we + // shutdown both for reading and writing. This will + // unblock and read call with an exception. The thread + // per connection then closes the transceiver. + // + _transceiver.shutdownReadWrite(); + } + else if(_state == StateNotValidated) { + // + // If we change from not validated, we can close right + // away. + // assert(!_registeredWithPool); // - // We must make sure that nobody is sending when - // we close the transceiver. + // We must make sure that nobody is sending when we + // close the transceiver. // synchronized(_sendMutex) { @@ -1601,16 +1535,40 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else { + // + // Otherwise we first must make sure that we are + // registered, then we unregister, and let finished() + // do the close. + // registerWithPool(); unregisterWithPool(); } - break; } } + // + // We only register with the connection monitor if our new state + // is StateActive. Otherwise we unregister with the connection + // monitor, but only if we were registered before, i.e., if our + // old state was StateActive. + // + IceInternal.ConnectionMonitor connectionMonitor = _instance.connectionMonitor(); + if(connectionMonitor != null) + { + if(state == StateActive) + { + connectionMonitor.add(this); + } + else if(_state == StateActive) + { + connectionMonitor.remove(this); + } + } + _state = state; _stateTime = System.currentTimeMillis(); + notifyAll(); if(_state == StateClosing && _dispatchCount == 0) @@ -1641,10 +1599,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // message. // IceInternal.BasicStream os = new IceInternal.BasicStream(_instance); - os.writeByte(IceInternal.Protocol.magic[0]); - os.writeByte(IceInternal.Protocol.magic[1]); - os.writeByte(IceInternal.Protocol.magic[2]); - os.writeByte(IceInternal.Protocol.magic[3]); + os.writeBlob(IceInternal.Protocol.magic); os.writeByte(IceInternal.Protocol.protocolMajor); os.writeByte(IceInternal.Protocol.protocolMinor); os.writeByte(IceInternal.Protocol.encodingMajor); @@ -1658,7 +1613,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // IceInternal.TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels); _transceiver.write(os, _endpoint.timeout()); - _transceiver.shutdown(); + _transceiver.shutdownWrite(); } } } @@ -1666,31 +1621,242 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private void registerWithPool() { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. + if(!_registeredWithPool) { _threadPool._register(_transceiver.fd(), this); _registeredWithPool = true; - - IceInternal.ConnectionMonitor connectionMonitor = _instance.connectionMonitor(); - if(connectionMonitor != null) - { - connectionMonitor.add(this); - } } } private void unregisterWithPool() { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. + if(_registeredWithPool) { _threadPool.unregister(_transceiver.fd()); _registeredWithPool = false; + } + } - IceInternal.ConnectionMonitor connectionMonitor = _instance.connectionMonitor(); - if(connectionMonitor != null) + private IceInternal.BasicStream + doCompress(IceInternal.BasicStream uncompressed, boolean compress) + { + if(_compressionSupported) + { + if(compress && uncompressed.size() >= 100) { - connectionMonitor.remove(this); + // + // Do compression. + // + IceInternal.BasicStream cstream = uncompressed.compress(IceInternal.Protocol.headerSize); + if(cstream != null) + { + // + // Set compression status. + // + cstream.pos(9); + cstream.writeByte((byte)2); + + // + // Write the size of the compressed stream into the header. + // + cstream.pos(10); + cstream.writeInt(cstream.size()); + + // + // Write the compression status and size of the compressed stream into the header of the + // uncompressed stream -- we need this to trace requests correctly. + // + uncompressed.pos(9); + uncompressed.writeByte((byte)2); + uncompressed.writeInt(cstream.size()); + + return cstream; + } + } + } + + uncompressed.pos(9); + uncompressed.writeByte((byte)((_compressionSupported && compress) ? 1 : 0)); + + // + // Not compressed, fill in the message size. + // + uncompressed.pos(10); + uncompressed.writeInt(uncompressed.size()); + + return uncompressed; + } + + private static class MessageInfo + { + MessageInfo(IceInternal.BasicStream stream) + { + this.stream = stream; + } + + IceInternal.BasicStream stream; + boolean destroyStream; + int invokeNum; + int requestId; + byte compress; + IceInternal.ServantManager servantManager; + ObjectAdapter adapter; + IceInternal.OutgoingAsync outAsync; + } + + private void + parseMessage(MessageInfo info) + { + assert(_state > StateNotValidated && _state < StateClosed); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + + try + { + // + // We don't need to check magic and version here. This has + // already been done by the ThreadPool or ThreadPerConnection, + // which provides us with the stream. + // + assert(info.stream.pos() == info.stream.size()); + info.stream.pos(8); + byte messageType = info.stream.readByte(); + info.compress = info.stream.readByte(); + if(info.compress == (byte)2) + { + if(_compressionSupported) + { + IceInternal.BasicStream ustream = info.stream.uncompress(IceInternal.Protocol.headerSize); + if(ustream != info.stream) + { + info.destroyStream = true; + info.stream = ustream; + } + } + else + { + throw new CompressionNotSupportedException(); + } + } + info.stream.pos(IceInternal.Protocol.headerSize); + + switch(messageType) + { + case IceInternal.Protocol.closeConnectionMsg: + { + IceInternal.TraceUtil.traceHeader("received close connection", info.stream, _logger, _traceLevels); + if(_endpoint.datagram() && _warn) + { + _logger.warning("ignoring close connection message for datagram connection:\n" + _desc); + } + else + { + setState(StateClosed, new CloseConnectionException()); + } + break; + } + + case IceInternal.Protocol.requestMsg: + { + if(_state == StateClosing) + { + IceInternal.TraceUtil.traceRequest("received request during closing\n" + + "(ignored by server, client will retry)", + info.stream, _logger, _traceLevels); + } + else + { + IceInternal.TraceUtil.traceRequest("received request", info.stream, _logger, _traceLevels); + info.requestId = info.stream.readInt(); + info.invokeNum = 1; + info.servantManager = _servantManager; + info.adapter = _adapter; + ++_dispatchCount; + } + break; + } + + case IceInternal.Protocol.requestBatchMsg: + { + if(_state == StateClosing) + { + IceInternal.TraceUtil.traceBatchRequest("received batch request during closing\n" + + "(ignored by server, client will retry)", + info.stream, _logger, _traceLevels); + } + else + { + IceInternal.TraceUtil.traceBatchRequest("received batch request", info.stream, _logger, + _traceLevels); + info.invokeNum = info.stream.readInt(); + if(info.invokeNum < 0) + { + info.invokeNum = 0; + throw new NegativeSizeException(); + } + info.servantManager = _servantManager; + info.adapter = _adapter; + _dispatchCount += info.invokeNum; + } + break; + } + + case IceInternal.Protocol.replyMsg: + { + IceInternal.TraceUtil.traceReply("received reply", info.stream, _logger, _traceLevels); + info.requestId = info.stream.readInt(); + IceInternal.Outgoing out = (IceInternal.Outgoing)_requests.remove(info.requestId); + if(out != null) + { + out.finished(info.stream); + } + else + { + info.outAsync = (IceInternal.OutgoingAsync)_asyncRequests.remove(info.requestId); + if(info.outAsync == null) + { + throw new UnknownRequestIdException(); + } + } + break; + } + + case IceInternal.Protocol.validateConnectionMsg: + { + IceInternal.TraceUtil.traceHeader("received validate connection", info.stream, _logger, + _traceLevels); + if(_warn) + { + _logger.warning("ignoring unexpected validate connection message:\n" + _desc); + } + break; + } + + default: + { + IceInternal.TraceUtil.traceHeader("received unknown message\n" + + "(invalid, closing connection)", info.stream, _logger, + _traceLevels); + throw new UnknownMessageException(); + } + } + } + catch(LocalException ex) + { + setState(StateClosed, ex); + + if(info.destroyStream) + { + info.stream.destroy(); + info.destroyStream = false; } } } @@ -1810,26 +1976,289 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } private void + run() + { + try + { + // + // First we must validate and activate this connection. This must + // be done here, and not in the connection factory. Please see the + // comments in the connection factory for details. + // + validate(); + } + catch(LocalException ex) + { + synchronized(this) + { + assert(_state == StateClosed); + + // + // We must make sure that nobody is sending when we close the + // transceiver. + // + synchronized(_sendMutex) + { + try + { + _transceiver.close(); + } + catch(LocalException e) + { + // Here we ignore any exceptions in close(). + } + + _transceiver = null; + notifyAll(); + } + } + return; + } + + activate(); + + boolean warnUdp = _instance.properties().getPropertyAsInt("Ice.Warn.Datagrams") > 0; + + boolean closed = false; + + while(!closed) + { + // + // We must accept new connections outside the thread + // synchronization, because we use blocking accept. + // + + IceInternal.BasicStream stream = new IceInternal.BasicStream(_instance); + + try + { + stream.resize(IceInternal.Protocol.headerSize, true); + stream.pos(0); + _transceiver.read(stream, -1); + + int pos = stream.pos(); + assert(pos >= IceInternal.Protocol.headerSize); + stream.pos(0); + byte[] m = stream.readBlob(4); + if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || + m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) + { + BadMagicException ex = new BadMagicException(); + ex.badMagic = m; + throw ex; + } + byte pMajor = stream.readByte(); + byte pMinor = stream.readByte(); + if(pMajor != IceInternal.Protocol.protocolMajor) + { + UnsupportedProtocolException e = new UnsupportedProtocolException(); + e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; + e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; + e.major = IceInternal.Protocol.protocolMajor; + e.minor = IceInternal.Protocol.protocolMinor; + throw e; + } + byte eMajor = stream.readByte(); + byte eMinor = stream.readByte(); + if(eMajor != IceInternal.Protocol.encodingMajor) + { + UnsupportedEncodingException e = new UnsupportedEncodingException(); + e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; + e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; + e.major = IceInternal.Protocol.encodingMajor; + e.minor = IceInternal.Protocol.encodingMinor; + throw e; + } + byte messageType = stream.readByte(); + byte compress = stream.readByte(); + int size = stream.readInt(); + if(size < IceInternal.Protocol.headerSize) + { + throw new IllegalMessageSizeException(); + } + if(size > _instance.messageSizeMax()) + { + throw new MemoryLimitException(); + } + if(size > stream.size()) + { + stream.resize(size, true); + } + stream.pos(pos); + + if(pos != stream.size()) + { + if(_endpoint.datagram()) + { + if(warnUdp) + { + _logger.warning("DatagramLimitException: maximum size of " + pos + " exceeded"); + } + throw new DatagramLimitException(); + } + else + { + _transceiver.read(stream, -1); + assert(stream.pos() == stream.size()); + } + } + } + catch(DatagramLimitException ex) // Expected. + { + continue; + } + catch(LocalException ex) + { + exception(ex); + } + + MessageInfo info = new MessageInfo(stream); + + LocalException exception = null; + + IceInternal.IntMap requests = null; + IceInternal.IntMap asyncRequests = null; + + try + { + synchronized(this) + { + while(_state == StateHolding) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + if(_state != StateClosed) + { + parseMessage(info); + } + + // + // parseMessage() can close the connection, so we must + // check for closed state again. + // + if(_state == StateClosed) + { + // + // We must make sure that nobody is sending when we close + // the transceiver. + // + synchronized(_sendMutex) + { + try + { + _transceiver.close(); + } + catch(LocalException ex) + { + exception = ex; + } + + _transceiver = null; + notifyAll(); + } + + // + // We cannot simply return here. We have to make sure + // that all requests (regular and async) are notified + // about the closed connection below. + // + closed = true; + } + + if(_state == StateClosed || _state == StateClosing) + { + requests = _requests; + _requests = new IceInternal.IntMap(); + + asyncRequests = _asyncRequests; + _asyncRequests = new IceInternal.IntMap(); + } + } + + // + // Asynchronous replies must be handled outside the thread + // synchronization, so that nested calls are possible. + // + if(info.outAsync != null) + { + info.outAsync.__finished(info.stream); + } + + // + // Method invocation (or multiple invocations for batch messages) + // must be done outside the thread synchronization, so that nested + // calls are possible. + // + invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, + info.adapter); + + if(requests != null) + { + java.util.Iterator i = requests.entryIterator(); + while(i.hasNext()) + { + IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); + IceInternal.Outgoing out = (IceInternal.Outgoing)e.getValue(); + out.finished(_exception); // The exception is immutable at this point. + } + } + + if(asyncRequests != null) + { + java.util.Iterator i = asyncRequests.entryIterator(); + while(i.hasNext()) + { + IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); + IceInternal.OutgoingAsync out = (IceInternal.OutgoingAsync)e.getValue(); + out.__finished(_exception); // The exception is immutable at this point. + } + } + + if(exception != null) + { + assert(closed); + throw exception; + } + } + finally + { + if(info.destroyStream) + { + info.stream.destroy(); + } + } + } + } + + private void warning(String msg, Exception ex) { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); - Throwable t = ex; - do - { - t.printStackTrace(pw); - t = t.getCause(); - if(t != null) - { - pw.println("Caused by:\n"); - } - } - while(t != null); + ex.printStackTrace(pw); pw.flush(); String s = msg + ":\n" + sw.toString() + _desc; _logger.warning(s); } + private void + error(String msg, Exception ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = msg + ":\n" + _desc + sw.toString(); + _logger.error(s); + } + private IceInternal.Incoming getIncoming(ObjectAdapter adapter, boolean response, byte compress) { @@ -1863,6 +2292,23 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } + private class ThreadPerConnection extends Thread + { + public void + run() + { + try + { + ConnectionI.this.run(); + } + catch(Exception ex) + { + ConnectionI.this.error("exception in thread per connection", ex); + } + } + } + private Thread _threadPerConnection; + private IceInternal.Transceiver _transceiver; private final String _desc; private final String _type; @@ -1883,6 +2329,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private long _acmAbsoluteTimeoutMillis; private int _nextRequestId; + private IceInternal.IntMap _requests = new IceInternal.IntMap(); private IceInternal.IntMap _asyncRequests = new IceInternal.IntMap(); diff --git a/java/src/IceInternal/Acceptor.java b/java/src/IceInternal/Acceptor.java index 8d021c62c00..b61c5f1163a 100644 --- a/java/src/IceInternal/Acceptor.java +++ b/java/src/IceInternal/Acceptor.java @@ -15,5 +15,6 @@ public interface Acceptor void close(); void listen(); Transceiver accept(int timeout); + void connectToSelf(); String toString(); } diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 399573dbb55..fece82a38e4 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -37,8 +37,8 @@ public class IncomingConnectionFactory extends EventHandler synchronized(this) { // - // First we wait until the connection factory itself is in - // holding state. + // First we wait until the connection factory itself is in holding + // state. // while(_state < StateHolding) { @@ -72,6 +72,7 @@ public class IncomingConnectionFactory extends EventHandler public void waitUntilFinished() { + Thread threadPerIncomingConnectionFactory = null; java.util.LinkedList connections; synchronized(this) @@ -89,19 +90,32 @@ public class IncomingConnectionFactory extends EventHandler { } } - + + assert(_state == StateClosed); + + threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory; + _threadPerIncomingConnectionFactory = null; + // - // We want to wait until all connections are finished - // outside the thread synchronization. + // We want to wait until all connections are finished outside the + // thread synchronization. // connections = _connections; _connections = new java.util.LinkedList(); } - - // - // Now we wait for until the destruction of each connection is - // finished. - // + + if(threadPerIncomingConnectionFactory != null && + threadPerIncomingConnectionFactory != Thread.currentThread()) + { + try + { + threadPerIncomingConnectionFactory.join(); + } + catch(InterruptedException ex) + { + } + } + java.util.ListIterator p = connections.listIterator(); while(p.hasNext()) { @@ -176,92 +190,101 @@ public class IncomingConnectionFactory extends EventHandler public boolean datagram() { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. return _endpoint.datagram(); } public boolean readable() { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. return false; } public void read(BasicStream unused) { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. assert(false); // Must not be called. } public void message(BasicStream unused, ThreadPool threadPool) { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. + Ice.ConnectionI connection = null; synchronized(this) { - if(_state != StateActive) + try { - Thread.yield(); - threadPool.promoteFollower(); - return; - } + if(_state != StateActive) + { + Thread.yield(); + return; + } - // - // Reap connections for which destruction has completed. - // - java.util.ListIterator p = _connections.listIterator(); - while(p.hasNext()) - { - Ice.ConnectionI con = (Ice.ConnectionI)p.next(); - if(con.isFinished()) + // + // Reap connections for which destruction has completed. + // + java.util.ListIterator p = _connections.listIterator(); + while(p.hasNext()) { - p.remove(); + Ice.ConnectionI con = (Ice.ConnectionI)p.next(); + if(con.isFinished()) + { + p.remove(); + } } - } - // - // Now accept a new connection. - // - Transceiver transceiver; - try - { - transceiver = _acceptor.accept(0); - } - catch(Ice.TimeoutException ex) - { - // Ignore timeouts. - return; - } - catch(Ice.LocalException ex) - { - // Warn about other Ice local exceptions. - if(_warn) + // + // Now accept a new connection. + // + Transceiver transceiver; + try { - warning(ex); + transceiver = _acceptor.accept(0); } - return; + catch(Ice.TimeoutException ex) + { + // Ignore timeouts. + return; + } + catch(Ice.LocalException ex) + { + // Warn about other Ice local exceptions. + if(_warn) + { + warning(ex); + } + return; + } + + assert(transceiver != null); + + // + // Create a connection object for the connection. + // + connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter); + _connections.add(connection); } finally { // - // We must promote a follower after we accepted a new - // connection, or after an exception. + // This makes sure that we promote a follower before we leave + // the scope of the mutex above, but after we call accept() + // (if we call it). // threadPool.promoteFollower(); } - - // - // Create a connection object for the connection. - // - assert(transceiver != null); - connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter); - _connections.add(connection); } assert(connection != null); // - // We validate and activate outside the thread - // synchronization, to not block the factory. + // We validate and activate outside the thread synchronization, to not block + // the factory. // try { @@ -282,6 +305,8 @@ public class IncomingConnectionFactory extends EventHandler public synchronized void finished(ThreadPool threadPool) { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. + threadPool.promoteFollower(); if(_state == StateActive) @@ -344,7 +369,18 @@ public class IncomingConnectionFactory extends EventHandler { _endpoint = h.value; Ice.ConnectionI connection = new Ice.ConnectionI(_instance, _transceiver, _endpoint, _adapter); - connection.validate(); + + // + // In thread per connection mode, the connection's thread will + // take care of connection validation, and we don't want to + // block here waiting until validation is complete. Therefore + // we don't call validate() in thread per connection mode. + // + if(!_instance.threadPerConnection()) + { + connection.validate(); + } + _connections.add(connection); } else @@ -354,6 +390,40 @@ public class IncomingConnectionFactory extends EventHandler _endpoint = h.value; assert(_acceptor != null); _acceptor.listen(); + + if(!_instance.threadPerConnection()) + { + // + // Only set _threadPool if we really need it, i.e., if we are + // not in thread per connection mode. Thread pools have lazy + // initialization in Instance, and we don't want them to be + // created if they are not needed. + // + _threadPool = ((Ice.ObjectAdapterI)_adapter).getThreadPool(); + } + else + { + // + // If we are in thread per connection mode, we also use + // one thread per incoming connection factory, that + // accepts new connections on this endpoint. + // + try + { + _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory(); + _threadPerIncomingConnectionFactory.start(); + } + catch(RuntimeException ex) + { + error("cannot create thread for incoming connection factory", ex); + + _state = StateClosed; + _acceptor = null; + _threadPerIncomingConnectionFactory = null; + + throw ex; + } + } } } catch(RuntimeException ex) @@ -371,6 +441,7 @@ public class IncomingConnectionFactory extends EventHandler assert(_state == StateClosed); assert(_acceptor == null); assert(_connections.size() == 0); + assert(_threadPerIncomingConnectionFactory == null); // // Destroy the EventHandler's stream, so that its buffer @@ -401,7 +472,10 @@ public class IncomingConnectionFactory extends EventHandler { return; } - registerWithPool(); + if(!_instance.threadPerConnection()) + { + registerWithPool(); + } java.util.ListIterator p = _connections.listIterator(); while(p.hasNext()) @@ -418,7 +492,10 @@ public class IncomingConnectionFactory extends EventHandler { return; } - unregisterWithPool(); + if(!_instance.threadPerConnection()) + { + unregisterWithPool(); + } java.util.ListIterator p = _connections.listIterator(); while(p.hasNext()) @@ -431,15 +508,26 @@ public class IncomingConnectionFactory extends EventHandler case StateClosed: { - // - // If we come from holding state, we first need to - // register again before we unregister. - // - if(_state == StateHolding) - { - registerWithPool(); - } - unregisterWithPool(); + if(_instance.threadPerConnection()) + { + // + // Connect to our own acceptor, which unblocks our + // thread per incoming connection factory stuck in accept(). + // + _acceptor.connectToSelf(); + } + else + { + // + // If we come from holding state, we first need to + // register again before we unregister. + // + if(_state == StateHolding) + { + registerWithPool(); + } + unregisterWithPool(); + } java.util.ListIterator p = _connections.listIterator(); while(p.hasNext()) @@ -458,9 +546,11 @@ public class IncomingConnectionFactory extends EventHandler private void registerWithPool() { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. + if(_acceptor != null && !_registeredWithPool) { - ((Ice.ObjectAdapterI)_adapter).getThreadPool()._register(_acceptor.fd(), this); + _threadPool._register(_acceptor.fd(), this); _registeredWithPool = true; } } @@ -468,9 +558,11 @@ public class IncomingConnectionFactory extends EventHandler private void unregisterWithPool() { + assert(!_instance.threadPerConnection()); // Only for use with a thread pool. + if(_acceptor != null && _registeredWithPool) { - ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(_acceptor.fd()); + _threadPool.unregister(_acceptor.fd()); _registeredWithPool = false; } } @@ -486,14 +578,160 @@ public class IncomingConnectionFactory extends EventHandler _instance.logger().warning(s); } + private void + error(String msg, Exception ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = msg + ":\n" + toString() + sw.toString(); + _instance.logger().error(s); + } + + private void + run() + { + assert(_acceptor != null); + + while(true) + { + // + // We must accept new connections outside the thread + // synchronization, because we use blocking accept. + // + Transceiver transceiver = null; + try + { + transceiver = _acceptor.accept(-1); + } + catch(Ice.SocketException ex) + { + // Ignore socket exceptions. + } + catch(Ice.TimeoutException ex) + { + // Ignore timeouts. + } + catch(Ice.LocalException ex) + { + // Warn about other Ice local exceptions. + if(_warn) + { + warning(ex); + } + } + + Ice.ConnectionI connection = null; + + synchronized(this) + { + while(_state == StateHolding) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + if(_state == StateClosed) + { + if(transceiver != null) + { + try + { + transceiver.close(); + } + catch(Ice.LocalException ex) + { + // Here we ignore any exceptions in close(). + } + } + + try + { + _acceptor.close(); + } + catch(Ice.LocalException ex) + { + _acceptor = null; + notifyAll(); + throw ex; + } + + _acceptor = null; + notifyAll(); + return; + } + + assert(_state == StateActive); + + // + // Reap connections for which destruction has completed. + // + java.util.ListIterator p = _connections.listIterator(); + while(p.hasNext()) + { + Ice.ConnectionI con = (Ice.ConnectionI)p.next(); + if(con.isFinished()) + { + p.remove(); + } + } + + // + // Create a connection object for the connection. + // + // In Java a keyboard interrupt causes accept() to raise a + // SocketException, therefore transceiver may be null. + // + if(transceiver != null) + { + connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter); + _connections.add(connection); + } + } + + // + // In thread per connection mode, the connection's thread will + // take care of connection validation. We don't want to block + // this thread waiting until validation is complete, because + // in contrast to thread pool mode, it is the only thread that + // can accept connections with this factory's + // acceptor. Therefore we don't call validate() in thread per + // connection mode. + // + } + } + + private class ThreadPerIncomingConnectionFactory extends Thread + { + public void + run() + { + try + { + IncomingConnectionFactory.this.run(); + } + catch(Exception ex) + { + IncomingConnectionFactory.this.error("exception in thread per incoming connection factory", ex); + } + } + } + private Thread _threadPerIncomingConnectionFactory; + private Acceptor _acceptor; private final Transceiver _transceiver; private Endpoint _endpoint; private final Ice.ObjectAdapter _adapter; - private ThreadPool _serverThreadPool; private boolean _registeredWithPool; + private ThreadPool _threadPool; private final boolean _warn; diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 6fb99599217..18ae8266bc9 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -196,6 +196,18 @@ public class Instance return _serverThreadPool; } + public boolean + threadPerConnection() + { + return _threadPerConnection; + } + + public int + threadPerConnectionStackSize() + { + return _threadPerConnectionStackSize; + } + public synchronized EndpointFactoryManager endpointFactoryManager() { @@ -355,7 +367,7 @@ public class Instance _messageSizeMax = num * 1024; // Property is in kilobytes, _messageSizeMax in bytes } } - + { int num = _properties.getPropertyAsIntWithDefault("Ice.ConnectionIdleTime", 60); if(num < 0) @@ -367,7 +379,18 @@ public class Instance _connectionIdleTime = num; } } - + + _threadPerConnection = _properties.getPropertyAsInt("Ice.ThreadPerConnection") > 0; + + { + int stackSize = _properties.getPropertyAsInt("Ice.ThreadPerConnection.StackSize"); + if(stackSize < 0) + { + stackSize = 0; + } + _threadPerConnectionStackSize = stackSize; + } + _routerManager = new RouterManager(); _locatorManager = new LocatorManager(); @@ -621,6 +644,8 @@ public class Instance private ObjectAdapterFactory _objectAdapterFactory; private ThreadPool _clientThreadPool; private ThreadPool _serverThreadPool; + private final boolean _threadPerConnection; + private final int _threadPerConnectionStackSize; private EndpointFactoryManager _endpointFactoryManager; private Ice.PluginManager _pluginManager; private final BufferManager _bufferManager; // Immutable, not reset by destroy(). diff --git a/java/src/IceInternal/Network.java b/java/src/IceInternal/Network.java index 94577614268..eb58430452a 100644 --- a/java/src/IceInternal/Network.java +++ b/java/src/IceInternal/Network.java @@ -134,6 +134,21 @@ public final class Network } public static void + closeSocket(java.nio.channels.SelectableChannel fd) + { + try + { + fd.close(); + } + catch(java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + } + + public static void setBlock(java.nio.channels.SelectableChannel fd, boolean block) { try diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 6e8bc384b24..270a3608a6c 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -490,16 +490,13 @@ public class OutgoingConnectionFactory while(p.hasNext()) { Ice.ConnectionI conn = (Ice.ConnectionI)p.next(); - if(conn.isValidated()) + try { - try - { - conn.flushBatchRequests(); - } - catch(Ice.LocalException ex) - { - // Ignore. - } + conn.flushBatchRequests(); + } + catch(Ice.LocalException ex) + { + // Ignore. } } } diff --git a/java/src/IceInternal/TcpAcceptor.java b/java/src/IceInternal/TcpAcceptor.java index ce564c76b1a..4142d4adc5d 100644 --- a/java/src/IceInternal/TcpAcceptor.java +++ b/java/src/IceInternal/TcpAcceptor.java @@ -68,6 +68,15 @@ class TcpAcceptor implements Acceptor return new TcpTransceiver(_instance, fd); } + public void + connectToSelf() + { + java.nio.channels.SocketChannel fd = Network.createTcpSocket(); + Network.setBlock(fd, false); + Network.doConnect(fd, _addr, -1); + Network.closeSocket(fd); + } + public String toString() { diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index 69213bc3750..16eec36d42b 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -43,11 +43,11 @@ final class TcpTransceiver implements Transceiver } public void - shutdown() + shutdownWrite() { if(_traceLevels.network >= 2) { - String s = "shutting down tcp connection\n" + toString(); + String s = "shutting down tcp connection for writing\n" + toString(); _logger.trace(_traceLevels.networkCat, s); } @@ -66,6 +66,38 @@ final class TcpTransceiver implements Transceiver } public void + shutdownReadWrite() + { + if(_traceLevels.network >= 2) + { + String s = "shutting down tcp connection for reading and writing\n" + toString(); + _logger.trace(_traceLevels.networkCat, s); + } + + assert(_fd != null); + java.net.Socket socket = _fd.socket(); + try + { + // + // TODO: Java does not support SHUT_RDWR. Calling both + // shutdownInput and shutdownOutput results in an exception. + // + socket.shutdownInput(); // Shutdown socket for reading + //socket.shutdownOutput(); // Shutdown socket for writing + } + catch(java.net.SocketException ex) + { + // Ignore. + } + catch(java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + } + + public void write(BasicStream stream, int timeout) { java.nio.ByteBuffer buf = stream.prepareWrite(); diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index f916ff256f5..1b83927d50f 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -33,6 +33,12 @@ public final class ThreadPool _promote = true; _warnUdp = _instance.properties().getPropertyAsInt("Ice.Warn.Datagrams") > 0; + // + // If we are in thread per connection mode, no thread pool should + // ever be created. + // + assert(!_instance.threadPerConnection()); + String programName = _instance.properties().getProperty("Ice.ProgramName"); if(programName.length() > 0) { diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java index ea2ddda7b30..602e4648704 100644 --- a/java/src/IceInternal/Transceiver.java +++ b/java/src/IceInternal/Transceiver.java @@ -13,7 +13,8 @@ public interface Transceiver { java.nio.channels.SelectableChannel fd(); void close(); - void shutdown(); + void shutdownWrite(); + void shutdownReadWrite(); void write(BasicStream stream, int timeout); void read(BasicStream stream, int timeout); String type(); diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index c77fb126cb7..aab3f4fbf08 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -39,7 +39,12 @@ final class UdpTransceiver implements Transceiver } public void - shutdown() + shutdownWrite() + { + } + + public void + shutdownReadWrite() { } @@ -95,12 +100,11 @@ final class UdpTransceiver implements Transceiver } public void - read(BasicStream stream, int timeout) + read(BasicStream stream, int timeout) // NOTE: timeout is ignored { - // TODO: Timeouts are ignored!! + assert(stream.pos() == 0); - assert(stream.pos() == 0); - final int packetSize = java.lang.Math.min(_maxPacketSize, _rcvSize - _udpOverhead); + final int packetSize = java.lang.Math.min(_maxPacketSize, _rcvSize - _udpOverhead); if(packetSize < stream.size()) { // @@ -153,11 +157,15 @@ final class UdpTransceiver implements Transceiver } else { + assert(_fd != null); try { - assert(_fd != null); _fd.receive(buf); ret = buf.position(); + if(ret == 0) + { + continue; + } } catch(java.io.InterruptedIOException ex) { |