summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorMark Spruiell <mes@zeroc.com>2004-12-08 18:59:41 +0000
committerMark Spruiell <mes@zeroc.com>2004-12-08 18:59:41 +0000
commitd821c757210e7f42bf78fdcd6699464786546654 (patch)
tree9ab20a747e779dfceeab9e4f896f17de6ad1ea43 /java/src
parentAdded regular expression matching to property checking code. (diff)
downloadice-d821c757210e7f42bf78fdcd6699464786546654.tar.bz2
ice-d821c757210e7f42bf78fdcd6699464786546654.tar.xz
ice-d821c757210e7f42bf78fdcd6699464786546654.zip
adding thread-per-connection
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/ConnectionI.java1103
-rw-r--r--java/src/IceInternal/Acceptor.java1
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java378
-rw-r--r--java/src/IceInternal/Instance.java29
-rw-r--r--java/src/IceInternal/Network.java15
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java15
-rw-r--r--java/src/IceInternal/TcpAcceptor.java9
-rw-r--r--java/src/IceInternal/TcpTransceiver.java36
-rw-r--r--java/src/IceInternal/ThreadPool.java6
-rw-r--r--java/src/IceInternal/Transceiver.java3
-rw-r--r--java/src/IceInternal/UdpTransceiver.java20
11 files changed, 1197 insertions, 418 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index f02222732f6..c8b82685d6e 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -27,6 +27,34 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public synchronized void
validate()
{
+ if(_instance.threadPerConnection() && _threadPerConnection != Thread.currentThread())
+ {
+ //
+ // In thread per connection mode, this connection's thread
+ // will take care of connection validation. Therefore all we
+ // have to do here is to wait until this thread has completed
+ // validation.
+ //
+ while(_state == StateNotValidated)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ if(_state >= StateClosing)
+ {
+ assert(_exception != null);
+ throw _exception;
+ }
+
+ return;
+ }
+
assert(_state == StateNotValidated);
if(!_endpoint.datagram()) // Datagram connections are always implicitly validated.
@@ -42,10 +70,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// with respect to connection validation.
//
IceInternal.BasicStream os = new IceInternal.BasicStream(_instance);
- os.writeByte(IceInternal.Protocol.magic[0]);
- os.writeByte(IceInternal.Protocol.magic[1]);
- os.writeByte(IceInternal.Protocol.magic[2]);
- os.writeByte(IceInternal.Protocol.magic[3]);
+ os.writeBlob(IceInternal.Protocol.magic);
os.writeByte(IceInternal.Protocol.protocolMajor);
os.writeByte(IceInternal.Protocol.protocolMinor);
os.writeByte(IceInternal.Protocol.encodingMajor);
@@ -70,13 +95,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_transceiver.read(is, _endpoint.timeout());
assert(is.pos() == IceInternal.Protocol.headerSize);
is.pos(0);
- byte[] m = new byte[4];
- m[0] = is.readByte();
- m[1] = is.readByte();
- m[2] = is.readByte();
- m[3] = is.readByte();
- if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1]
- || m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
+ byte[] m = is.readBlob(4);
+ if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
+ m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
{
BadMagicException ex = new BadMagicException();
ex.badMagic = m;
@@ -186,22 +207,31 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
public synchronized boolean
- isValidated()
- {
- return _state > StateNotValidated;
- }
-
- public synchronized boolean
isDestroyed()
{
return _state >= StateClosing;
}
- public synchronized boolean
+ public boolean
isFinished()
{
- if(_transceiver == null && _dispatchCount == 0)
+ Thread threadPerConnection = null;
+
+ synchronized(this)
{
+ if(_transceiver != null || _dispatchCount != 0 ||
+ (_threadPerConnection != null &&
+ _threadPerConnection != Thread.currentThread() &&
+ _threadPerConnection.isAlive()))
+ {
+ return false;
+ }
+
+ assert(_state == StateClosed);
+
+ threadPerConnection = _threadPerConnection;
+ _threadPerConnection = null;
+
//
// We must destroy the incoming cache. It is now not
// needed anymore.
@@ -214,13 +244,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_incomingCache = _incomingCache.next;
}
}
-
- return true;
}
- else
+
+ if(threadPerConnection != null && threadPerConnection != Thread.currentThread())
{
- return false;
+ try
+ {
+ threadPerConnection.join();
+ }
+ catch(InterruptedException ex)
+ {
+ }
}
+
+ return true;
}
public synchronized void
@@ -238,88 +275,107 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- public synchronized void
+ public void
waitUntilFinished()
{
- //
- // We wait indefinitely until connection closing has been
- // initiated. We also wait indefinitely until all outstanding
- // requests are completed. Otherwise we couldn't guarantee
- // that there are no outstanding calls when deactivate() is
- // called on the servant locators.
- //
- while(_state < StateClosing || _dispatchCount > 0)
+ Thread threadPerConnection = null;
+
+ synchronized(this)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
+ //
+ // We wait indefinitely until connection closing has been
+ // initiated. We also wait indefinitely until all outstanding
+ // requests are completed. Otherwise we couldn't guarantee
+ // that there are no outstanding calls when deactivate() is
+ // called on the servant locators.
+ //
+ while(_state < StateClosing || _dispatchCount > 0)
{
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
}
- }
-
- //
- // Now we must wait until close() has been called on the
- // transceiver.
- //
- while(_transceiver != null)
- {
- try
+
+ //
+ // Now we must wait until close() has been called on the
+ // transceiver.
+ //
+ while(_transceiver != null)
{
- if(_state != StateClosed && _endpoint.timeout() >= 0)
+ try
{
- long absoluteWaitTime = _stateTime + _endpoint.timeout();
- long waitTime = absoluteWaitTime - System.currentTimeMillis();
-
- if(waitTime > 0)
+ if(_state != StateClosed && _endpoint.timeout() >= 0)
{
- //
- // We must wait a bit longer until we close
- // this connection.
- //
- wait(waitTime);
- if(System.currentTimeMillis() >= absoluteWaitTime)
+ long absoluteWaitTime = _stateTime + _endpoint.timeout();
+ long waitTime = absoluteWaitTime - System.currentTimeMillis();
+
+ if(waitTime > 0)
+ {
+ //
+ // We must wait a bit longer until we close this
+ // connection.
+ //
+ wait(waitTime);
+ if(System.currentTimeMillis() >= absoluteWaitTime)
+ {
+ setState(StateClosed, new CloseTimeoutException());
+ }
+ }
+ else
{
+ //
+ // We already waited long enough, so let's close this
+ // connection!
+ //
setState(StateClosed, new CloseTimeoutException());
}
+
+ //
+ // No return here, we must still wait until
+ // close() is called on the _transceiver.
+ //
}
else
{
- //
- // We already waited long enough, so let's
- // close this connection!
- //
- setState(StateClosed, new CloseTimeoutException());
+ wait();
}
-
- //
- // No return here, we must still wait until
- // close() is called on the _transceiver.
- //
}
- else
+ catch(InterruptedException ex)
{
- wait();
}
}
- catch(InterruptedException ex)
+
+ assert(_state == StateClosed);
+
+ threadPerConnection = _threadPerConnection;
+ _threadPerConnection = null;
+
+ //
+ // We must destroy the incoming cache. It is now not
+ // needed anymore.
+ //
+ synchronized(_incomingCacheMutex)
{
+ while(_incomingCache != null)
+ {
+ _incomingCache.__destroy();
+ _incomingCache = _incomingCache.next;
+ }
}
}
- assert(_state == StateClosed);
-
- //
- // We must destroy the incoming cache. It is now not
- // needed anymore.
- //
- synchronized(_incomingCacheMutex)
+ if(threadPerConnection != null && threadPerConnection != Thread.currentThread())
{
- while(_incomingCache != null)
+ try
+ {
+ threadPerConnection.join();
+ }
+ catch(InterruptedException ex)
{
- _incomingCache.__destroy();
- _incomingCache = _incomingCache.next;
}
}
}
@@ -388,56 +444,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
os.writeBlob(_requestHdr);
}
- private IceInternal.BasicStream
- doCompress(IceInternal.BasicStream uncompressed, boolean compress)
- {
- if(_compressionSupported)
- {
- if(compress && uncompressed.size() >= 100)
- {
- //
- // Do compression.
- //
- IceInternal.BasicStream cstream = uncompressed.compress(IceInternal.Protocol.headerSize);
- if(cstream != null)
- {
- //
- // Set compression status.
- //
- cstream.pos(9);
- cstream.writeByte((byte)2);
-
- //
- // Write the size of the compressed stream into the header.
- //
- cstream.pos(10);
- cstream.writeInt(cstream.size());
-
- //
- // Write the compression status and size of the compressed stream into the header of the
- // uncompressed stream -- we need this to trace requests correctly.
- //
- uncompressed.pos(9);
- uncompressed.writeByte((byte)2);
- uncompressed.writeInt(cstream.size());
-
- return cstream;
- }
- }
- }
-
- uncompressed.pos(9);
- uncompressed.writeByte((byte)((_compressionSupported && compress) ? 1 : 0));
-
- //
- // Not compressed, fill in the message size.
- //
- uncompressed.pos(10);
- uncompressed.writeInt(uncompressed.size());
-
- return uncompressed;
- }
-
public void
sendRequest(IceInternal.BasicStream os, IceInternal.Outgoing out, boolean compress)
{
@@ -1046,18 +1052,22 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public boolean
datagram()
{
- return _endpoint.datagram();
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
+ return _endpoint.datagram(); // No mutex protection necessary, _endpoint is immutable.
}
public boolean
readable()
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
return true;
}
public void
read(IceInternal.BasicStream stream)
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
+
_transceiver.read(stream, 0);
//
@@ -1086,13 +1096,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public void
message(IceInternal.BasicStream stream, IceInternal.ThreadPool threadPool)
{
- byte compress = 0;
- int requestId = 0;
- int invokeNum = 0;
- IceInternal.ServantManager servantManager = null;
- ObjectAdapter adapter = null;
- IceInternal.OutgoingAsync outAsync = null;
- boolean destroyStream = false;
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
+
+ MessageInfo info = new MessageInfo(stream);
synchronized(this)
{
@@ -1103,156 +1109,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
threadPool.promoteFollower();
- assert(_state > StateNotValidated);
-
- if(_state == StateClosed)
+ if(_state != StateClosed)
{
- return;
+ parseMessage(info);
}
- if(_acmTimeout > 0)
+ //
+ // parseMessage() can close the connection, so we must check
+ // for closed state again.
+ //
+ if(_state == StateClosed)
{
- _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ return;
}
-
- try
- {
- //
- // We don't need to check magic and version here. This
- // has already been done by the ThreadPool, which
- // provides us the stream.
- //
- assert(stream.pos() == stream.size());
- stream.pos(8);
- byte messageType = stream.readByte();
- compress = stream.readByte();
- if(compress == (byte)2)
- {
- if(_compressionSupported)
- {
- IceInternal.BasicStream ustream = stream.uncompress(IceInternal.Protocol.headerSize);
- if(ustream != stream)
- {
- destroyStream = true;
- stream = ustream;
- }
- }
- else
- {
- throw new CompressionNotSupportedException();
- }
- }
- stream.pos(IceInternal.Protocol.headerSize);
-
- switch(messageType)
- {
- case IceInternal.Protocol.closeConnectionMsg:
- {
- IceInternal.TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels);
- if(_endpoint.datagram() && _warn)
- {
- _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
- }
- else
- {
- setState(StateClosed, new CloseConnectionException());
- }
- break;
- }
-
- case IceInternal.Protocol.requestMsg:
- {
- if(_state == StateClosing)
- {
- IceInternal.TraceUtil.traceRequest("received request during closing\n" +
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
- }
- else
- {
- IceInternal.TraceUtil.traceRequest("received request", stream, _logger, _traceLevels);
- requestId = stream.readInt();
- invokeNum = 1;
- servantManager = _servantManager;
- adapter = _adapter;
- ++_dispatchCount;
- }
- break;
- }
-
- case IceInternal.Protocol.requestBatchMsg:
- {
- if(_state == StateClosing)
- {
- IceInternal.TraceUtil.traceBatchRequest("received batch request during closing\n" +
- "(ignored by server, client will retry)",
- stream, _logger, _traceLevels);
- }
- else
- {
- IceInternal.TraceUtil.traceBatchRequest("received batch request", stream, _logger,
- _traceLevels);
- invokeNum = stream.readInt();
- if(invokeNum < 0)
- {
- throw new NegativeSizeException();
- }
- servantManager = _servantManager;
- adapter = _adapter;
- _dispatchCount += invokeNum;
- }
- break;
- }
-
- case IceInternal.Protocol.replyMsg:
- {
- IceInternal.TraceUtil.traceReply("received reply", stream, _logger, _traceLevels);
- requestId = stream.readInt();
- IceInternal.Outgoing out = (IceInternal.Outgoing)_requests.remove(requestId);
- if(out != null)
- {
- out.finished(stream);
- }
- else
- {
- outAsync = (IceInternal.OutgoingAsync)_asyncRequests.remove(requestId);
- if(outAsync == null)
- {
- throw new UnknownRequestIdException();
- }
- }
- break;
- }
-
- case IceInternal.Protocol.validateConnectionMsg:
- {
- IceInternal.TraceUtil.traceHeader("received validate connection", stream, _logger,
- _traceLevels);
- if(_warn)
- {
- _logger.warning("ignoring unexpected validate connection message:\n" + _desc);
- }
- break;
- }
-
- default:
- {
- IceInternal.TraceUtil.traceHeader("received unknown message\n" +
- "(invalid, closing connection)", stream, _logger,
- _traceLevels);
- throw new UnknownMessageException();
- }
- }
- }
- catch(LocalException ex)
- {
- if(destroyStream)
- {
- stream.destroy();
- }
- setState(StateClosed, ex);
- return;
- }
}
try
@@ -1261,9 +1130,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// Asynchronous replies must be handled outside the thread
// synchronization, so that nested calls are possible.
//
- if(outAsync != null)
+ if(info.outAsync != null)
{
- outAsync.__finished(stream);
+ info.outAsync.__finished(stream);
}
//
@@ -1271,13 +1140,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// must be done outside the thread synchronization, so that nested
// calls are possible.
//
- invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter);
+ invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter);
}
finally
{
- if(destroyStream)
+ if(info.destroyStream)
{
- stream.destroy();
+ info.stream.destroy();
}
}
}
@@ -1285,6 +1154,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public void
finished(IceInternal.ThreadPool threadPool)
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
+
threadPool.promoteFollower();
LocalException exception = null;
@@ -1421,15 +1292,60 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_adapter != null)
{
- _threadPool = ((ObjectAdapterI)_adapter).getThreadPool();
_servantManager = ((ObjectAdapterI)_adapter).getServantManager();
}
else
{
- _threadPool = _instance.clientThreadPool();
_servantManager = null;
}
+ if(!_instance.threadPerConnection())
+ {
+ //
+ // Only set _threadPool if we really need it, i.e., if we are
+ // not in thread per connection mode. Thread pools have lazy
+ // initialization in Instance, and we don't want them to be
+ // created if they are not needed.
+ //
+ if(_adapter != null)
+ {
+ _threadPool = ((ObjectAdapterI)_adapter).getThreadPool();
+ }
+ else
+ {
+ _threadPool = _instance.clientThreadPool();
+ }
+ }
+ else
+ {
+ _threadPool = null;
+
+ //
+ // If we are in thread per connection mode, create the thread
+ // for this connection.
+ //
+ try
+ {
+ _threadPerConnection = new ThreadPerConnection();
+ _threadPerConnection.start();
+ }
+ catch(RuntimeException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "cannot create thread for connection:\n" + sw.toString();
+ _instance.logger().error(s);
+
+ _state = StateClosed;
+ _transceiver = null;
+ _threadPerConnection = null;
+
+ throw ex;
+ }
+ }
+
_overrideCompress = _instance.defaultsAndOverrides().overrideCompress;
_overrideCompressValue = _instance.defaultsAndOverrides().overrideCompressValue;
}
@@ -1441,6 +1357,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
assert(_state == StateClosed);
assert(_transceiver == null);
assert(_dispatchCount == 0);
+ assert(_threadPerConnection == null);
assert(_incomingCache == null);
_batchStream.destroy();
@@ -1537,7 +1454,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
return;
}
- registerWithPool();
+ if(!_instance.threadPerConnection())
+ {
+ registerWithPool();
+ }
break;
}
@@ -1551,7 +1471,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
return;
}
- unregisterWithPool();
+ if(!_instance.threadPerConnection())
+ {
+ unregisterWithPool();
+ }
break;
}
@@ -1564,25 +1487,36 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
return;
}
- registerWithPool(); // We need to continue to read in closing state.
+ if(!_instance.threadPerConnection())
+ {
+ registerWithPool(); // We need to continue to read in closing state.
+ }
break;
}
case StateClosed:
{
- //
- // If we change from not validated, we can close right
- // away. Otherwise we first must make sure that we are
- // registered, then we unregister, and let finished()
- // do the close.
- //
- if(_state == StateNotValidated)
+ if(_instance.threadPerConnection())
+ {
+ //
+ // If we are in thread per connection mode, we
+ // shutdown both for reading and writing. This will
+ // unblock and read call with an exception. The thread
+ // per connection then closes the transceiver.
+ //
+ _transceiver.shutdownReadWrite();
+ }
+ else if(_state == StateNotValidated)
{
+ //
+ // If we change from not validated, we can close right
+ // away.
+ //
assert(!_registeredWithPool);
//
- // We must make sure that nobody is sending when
- // we close the transceiver.
+ // We must make sure that nobody is sending when we
+ // close the transceiver.
//
synchronized(_sendMutex)
{
@@ -1601,16 +1535,40 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
else
{
+ //
+ // Otherwise we first must make sure that we are
+ // registered, then we unregister, and let finished()
+ // do the close.
+ //
registerWithPool();
unregisterWithPool();
}
-
break;
}
}
+ //
+ // We only register with the connection monitor if our new state
+ // is StateActive. Otherwise we unregister with the connection
+ // monitor, but only if we were registered before, i.e., if our
+ // old state was StateActive.
+ //
+ IceInternal.ConnectionMonitor connectionMonitor = _instance.connectionMonitor();
+ if(connectionMonitor != null)
+ {
+ if(state == StateActive)
+ {
+ connectionMonitor.add(this);
+ }
+ else if(_state == StateActive)
+ {
+ connectionMonitor.remove(this);
+ }
+ }
+
_state = state;
_stateTime = System.currentTimeMillis();
+
notifyAll();
if(_state == StateClosing && _dispatchCount == 0)
@@ -1641,10 +1599,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// message.
//
IceInternal.BasicStream os = new IceInternal.BasicStream(_instance);
- os.writeByte(IceInternal.Protocol.magic[0]);
- os.writeByte(IceInternal.Protocol.magic[1]);
- os.writeByte(IceInternal.Protocol.magic[2]);
- os.writeByte(IceInternal.Protocol.magic[3]);
+ os.writeBlob(IceInternal.Protocol.magic);
os.writeByte(IceInternal.Protocol.protocolMajor);
os.writeByte(IceInternal.Protocol.protocolMinor);
os.writeByte(IceInternal.Protocol.encodingMajor);
@@ -1658,7 +1613,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
IceInternal.TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels);
_transceiver.write(os, _endpoint.timeout());
- _transceiver.shutdown();
+ _transceiver.shutdownWrite();
}
}
}
@@ -1666,31 +1621,242 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private void
registerWithPool()
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
+
if(!_registeredWithPool)
{
_threadPool._register(_transceiver.fd(), this);
_registeredWithPool = true;
-
- IceInternal.ConnectionMonitor connectionMonitor = _instance.connectionMonitor();
- if(connectionMonitor != null)
- {
- connectionMonitor.add(this);
- }
}
}
private void
unregisterWithPool()
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
+
if(_registeredWithPool)
{
_threadPool.unregister(_transceiver.fd());
_registeredWithPool = false;
+ }
+ }
- IceInternal.ConnectionMonitor connectionMonitor = _instance.connectionMonitor();
- if(connectionMonitor != null)
+ private IceInternal.BasicStream
+ doCompress(IceInternal.BasicStream uncompressed, boolean compress)
+ {
+ if(_compressionSupported)
+ {
+ if(compress && uncompressed.size() >= 100)
{
- connectionMonitor.remove(this);
+ //
+ // Do compression.
+ //
+ IceInternal.BasicStream cstream = uncompressed.compress(IceInternal.Protocol.headerSize);
+ if(cstream != null)
+ {
+ //
+ // Set compression status.
+ //
+ cstream.pos(9);
+ cstream.writeByte((byte)2);
+
+ //
+ // Write the size of the compressed stream into the header.
+ //
+ cstream.pos(10);
+ cstream.writeInt(cstream.size());
+
+ //
+ // Write the compression status and size of the compressed stream into the header of the
+ // uncompressed stream -- we need this to trace requests correctly.
+ //
+ uncompressed.pos(9);
+ uncompressed.writeByte((byte)2);
+ uncompressed.writeInt(cstream.size());
+
+ return cstream;
+ }
+ }
+ }
+
+ uncompressed.pos(9);
+ uncompressed.writeByte((byte)((_compressionSupported && compress) ? 1 : 0));
+
+ //
+ // Not compressed, fill in the message size.
+ //
+ uncompressed.pos(10);
+ uncompressed.writeInt(uncompressed.size());
+
+ return uncompressed;
+ }
+
+ private static class MessageInfo
+ {
+ MessageInfo(IceInternal.BasicStream stream)
+ {
+ this.stream = stream;
+ }
+
+ IceInternal.BasicStream stream;
+ boolean destroyStream;
+ int invokeNum;
+ int requestId;
+ byte compress;
+ IceInternal.ServantManager servantManager;
+ ObjectAdapter adapter;
+ IceInternal.OutgoingAsync outAsync;
+ }
+
+ private void
+ parseMessage(MessageInfo info)
+ {
+ assert(_state > StateNotValidated && _state < StateClosed);
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
+
+ try
+ {
+ //
+ // We don't need to check magic and version here. This has
+ // already been done by the ThreadPool or ThreadPerConnection,
+ // which provides us with the stream.
+ //
+ assert(info.stream.pos() == info.stream.size());
+ info.stream.pos(8);
+ byte messageType = info.stream.readByte();
+ info.compress = info.stream.readByte();
+ if(info.compress == (byte)2)
+ {
+ if(_compressionSupported)
+ {
+ IceInternal.BasicStream ustream = info.stream.uncompress(IceInternal.Protocol.headerSize);
+ if(ustream != info.stream)
+ {
+ info.destroyStream = true;
+ info.stream = ustream;
+ }
+ }
+ else
+ {
+ throw new CompressionNotSupportedException();
+ }
+ }
+ info.stream.pos(IceInternal.Protocol.headerSize);
+
+ switch(messageType)
+ {
+ case IceInternal.Protocol.closeConnectionMsg:
+ {
+ IceInternal.TraceUtil.traceHeader("received close connection", info.stream, _logger, _traceLevels);
+ if(_endpoint.datagram() && _warn)
+ {
+ _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
+ }
+ else
+ {
+ setState(StateClosed, new CloseConnectionException());
+ }
+ break;
+ }
+
+ case IceInternal.Protocol.requestMsg:
+ {
+ if(_state == StateClosing)
+ {
+ IceInternal.TraceUtil.traceRequest("received request during closing\n" +
+ "(ignored by server, client will retry)",
+ info.stream, _logger, _traceLevels);
+ }
+ else
+ {
+ IceInternal.TraceUtil.traceRequest("received request", info.stream, _logger, _traceLevels);
+ info.requestId = info.stream.readInt();
+ info.invokeNum = 1;
+ info.servantManager = _servantManager;
+ info.adapter = _adapter;
+ ++_dispatchCount;
+ }
+ break;
+ }
+
+ case IceInternal.Protocol.requestBatchMsg:
+ {
+ if(_state == StateClosing)
+ {
+ IceInternal.TraceUtil.traceBatchRequest("received batch request during closing\n" +
+ "(ignored by server, client will retry)",
+ info.stream, _logger, _traceLevels);
+ }
+ else
+ {
+ IceInternal.TraceUtil.traceBatchRequest("received batch request", info.stream, _logger,
+ _traceLevels);
+ info.invokeNum = info.stream.readInt();
+ if(info.invokeNum < 0)
+ {
+ info.invokeNum = 0;
+ throw new NegativeSizeException();
+ }
+ info.servantManager = _servantManager;
+ info.adapter = _adapter;
+ _dispatchCount += info.invokeNum;
+ }
+ break;
+ }
+
+ case IceInternal.Protocol.replyMsg:
+ {
+ IceInternal.TraceUtil.traceReply("received reply", info.stream, _logger, _traceLevels);
+ info.requestId = info.stream.readInt();
+ IceInternal.Outgoing out = (IceInternal.Outgoing)_requests.remove(info.requestId);
+ if(out != null)
+ {
+ out.finished(info.stream);
+ }
+ else
+ {
+ info.outAsync = (IceInternal.OutgoingAsync)_asyncRequests.remove(info.requestId);
+ if(info.outAsync == null)
+ {
+ throw new UnknownRequestIdException();
+ }
+ }
+ break;
+ }
+
+ case IceInternal.Protocol.validateConnectionMsg:
+ {
+ IceInternal.TraceUtil.traceHeader("received validate connection", info.stream, _logger,
+ _traceLevels);
+ if(_warn)
+ {
+ _logger.warning("ignoring unexpected validate connection message:\n" + _desc);
+ }
+ break;
+ }
+
+ default:
+ {
+ IceInternal.TraceUtil.traceHeader("received unknown message\n" +
+ "(invalid, closing connection)", info.stream, _logger,
+ _traceLevels);
+ throw new UnknownMessageException();
+ }
+ }
+ }
+ catch(LocalException ex)
+ {
+ setState(StateClosed, ex);
+
+ if(info.destroyStream)
+ {
+ info.stream.destroy();
+ info.destroyStream = false;
}
}
}
@@ -1810,26 +1976,289 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
private void
+ run()
+ {
+ try
+ {
+ //
+ // First we must validate and activate this connection. This must
+ // be done here, and not in the connection factory. Please see the
+ // comments in the connection factory for details.
+ //
+ validate();
+ }
+ catch(LocalException ex)
+ {
+ synchronized(this)
+ {
+ assert(_state == StateClosed);
+
+ //
+ // We must make sure that nobody is sending when we close the
+ // transceiver.
+ //
+ synchronized(_sendMutex)
+ {
+ try
+ {
+ _transceiver.close();
+ }
+ catch(LocalException e)
+ {
+ // Here we ignore any exceptions in close().
+ }
+
+ _transceiver = null;
+ notifyAll();
+ }
+ }
+ return;
+ }
+
+ activate();
+
+ boolean warnUdp = _instance.properties().getPropertyAsInt("Ice.Warn.Datagrams") > 0;
+
+ boolean closed = false;
+
+ while(!closed)
+ {
+ //
+ // We must accept new connections outside the thread
+ // synchronization, because we use blocking accept.
+ //
+
+ IceInternal.BasicStream stream = new IceInternal.BasicStream(_instance);
+
+ try
+ {
+ stream.resize(IceInternal.Protocol.headerSize, true);
+ stream.pos(0);
+ _transceiver.read(stream, -1);
+
+ int pos = stream.pos();
+ assert(pos >= IceInternal.Protocol.headerSize);
+ stream.pos(0);
+ byte[] m = stream.readBlob(4);
+ if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
+ m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
+ {
+ BadMagicException ex = new BadMagicException();
+ ex.badMagic = m;
+ throw ex;
+ }
+ byte pMajor = stream.readByte();
+ byte pMinor = stream.readByte();
+ if(pMajor != IceInternal.Protocol.protocolMajor)
+ {
+ UnsupportedProtocolException e = new UnsupportedProtocolException();
+ e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
+ e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
+ e.major = IceInternal.Protocol.protocolMajor;
+ e.minor = IceInternal.Protocol.protocolMinor;
+ throw e;
+ }
+ byte eMajor = stream.readByte();
+ byte eMinor = stream.readByte();
+ if(eMajor != IceInternal.Protocol.encodingMajor)
+ {
+ UnsupportedEncodingException e = new UnsupportedEncodingException();
+ e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
+ e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
+ e.major = IceInternal.Protocol.encodingMajor;
+ e.minor = IceInternal.Protocol.encodingMinor;
+ throw e;
+ }
+ byte messageType = stream.readByte();
+ byte compress = stream.readByte();
+ int size = stream.readInt();
+ if(size < IceInternal.Protocol.headerSize)
+ {
+ throw new IllegalMessageSizeException();
+ }
+ if(size > _instance.messageSizeMax())
+ {
+ throw new MemoryLimitException();
+ }
+ if(size > stream.size())
+ {
+ stream.resize(size, true);
+ }
+ stream.pos(pos);
+
+ if(pos != stream.size())
+ {
+ if(_endpoint.datagram())
+ {
+ if(warnUdp)
+ {
+ _logger.warning("DatagramLimitException: maximum size of " + pos + " exceeded");
+ }
+ throw new DatagramLimitException();
+ }
+ else
+ {
+ _transceiver.read(stream, -1);
+ assert(stream.pos() == stream.size());
+ }
+ }
+ }
+ catch(DatagramLimitException ex) // Expected.
+ {
+ continue;
+ }
+ catch(LocalException ex)
+ {
+ exception(ex);
+ }
+
+ MessageInfo info = new MessageInfo(stream);
+
+ LocalException exception = null;
+
+ IceInternal.IntMap requests = null;
+ IceInternal.IntMap asyncRequests = null;
+
+ try
+ {
+ synchronized(this)
+ {
+ while(_state == StateHolding)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ if(_state != StateClosed)
+ {
+ parseMessage(info);
+ }
+
+ //
+ // parseMessage() can close the connection, so we must
+ // check for closed state again.
+ //
+ if(_state == StateClosed)
+ {
+ //
+ // We must make sure that nobody is sending when we close
+ // the transceiver.
+ //
+ synchronized(_sendMutex)
+ {
+ try
+ {
+ _transceiver.close();
+ }
+ catch(LocalException ex)
+ {
+ exception = ex;
+ }
+
+ _transceiver = null;
+ notifyAll();
+ }
+
+ //
+ // We cannot simply return here. We have to make sure
+ // that all requests (regular and async) are notified
+ // about the closed connection below.
+ //
+ closed = true;
+ }
+
+ if(_state == StateClosed || _state == StateClosing)
+ {
+ requests = _requests;
+ _requests = new IceInternal.IntMap();
+
+ asyncRequests = _asyncRequests;
+ _asyncRequests = new IceInternal.IntMap();
+ }
+ }
+
+ //
+ // Asynchronous replies must be handled outside the thread
+ // synchronization, so that nested calls are possible.
+ //
+ if(info.outAsync != null)
+ {
+ info.outAsync.__finished(info.stream);
+ }
+
+ //
+ // Method invocation (or multiple invocations for batch messages)
+ // must be done outside the thread synchronization, so that nested
+ // calls are possible.
+ //
+ invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
+ info.adapter);
+
+ if(requests != null)
+ {
+ java.util.Iterator i = requests.entryIterator();
+ while(i.hasNext())
+ {
+ IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next();
+ IceInternal.Outgoing out = (IceInternal.Outgoing)e.getValue();
+ out.finished(_exception); // The exception is immutable at this point.
+ }
+ }
+
+ if(asyncRequests != null)
+ {
+ java.util.Iterator i = asyncRequests.entryIterator();
+ while(i.hasNext())
+ {
+ IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next();
+ IceInternal.OutgoingAsync out = (IceInternal.OutgoingAsync)e.getValue();
+ out.__finished(_exception); // The exception is immutable at this point.
+ }
+ }
+
+ if(exception != null)
+ {
+ assert(closed);
+ throw exception;
+ }
+ }
+ finally
+ {
+ if(info.destroyStream)
+ {
+ info.stream.destroy();
+ }
+ }
+ }
+ }
+
+ private void
warning(String msg, Exception ex)
{
java.io.StringWriter sw = new java.io.StringWriter();
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- Throwable t = ex;
- do
- {
- t.printStackTrace(pw);
- t = t.getCause();
- if(t != null)
- {
- pw.println("Caused by:\n");
- }
- }
- while(t != null);
+ ex.printStackTrace(pw);
pw.flush();
String s = msg + ":\n" + sw.toString() + _desc;
_logger.warning(s);
}
+ private void
+ error(String msg, Exception ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = msg + ":\n" + _desc + sw.toString();
+ _logger.error(s);
+ }
+
private IceInternal.Incoming
getIncoming(ObjectAdapter adapter, boolean response, byte compress)
{
@@ -1863,6 +2292,23 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
+ private class ThreadPerConnection extends Thread
+ {
+ public void
+ run()
+ {
+ try
+ {
+ ConnectionI.this.run();
+ }
+ catch(Exception ex)
+ {
+ ConnectionI.this.error("exception in thread per connection", ex);
+ }
+ }
+ }
+ private Thread _threadPerConnection;
+
private IceInternal.Transceiver _transceiver;
private final String _desc;
private final String _type;
@@ -1883,6 +2329,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private long _acmAbsoluteTimeoutMillis;
private int _nextRequestId;
+
private IceInternal.IntMap _requests = new IceInternal.IntMap();
private IceInternal.IntMap _asyncRequests = new IceInternal.IntMap();
diff --git a/java/src/IceInternal/Acceptor.java b/java/src/IceInternal/Acceptor.java
index 8d021c62c00..b61c5f1163a 100644
--- a/java/src/IceInternal/Acceptor.java
+++ b/java/src/IceInternal/Acceptor.java
@@ -15,5 +15,6 @@ public interface Acceptor
void close();
void listen();
Transceiver accept(int timeout);
+ void connectToSelf();
String toString();
}
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index 399573dbb55..fece82a38e4 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -37,8 +37,8 @@ public class IncomingConnectionFactory extends EventHandler
synchronized(this)
{
//
- // First we wait until the connection factory itself is in
- // holding state.
+ // First we wait until the connection factory itself is in holding
+ // state.
//
while(_state < StateHolding)
{
@@ -72,6 +72,7 @@ public class IncomingConnectionFactory extends EventHandler
public void
waitUntilFinished()
{
+ Thread threadPerIncomingConnectionFactory = null;
java.util.LinkedList connections;
synchronized(this)
@@ -89,19 +90,32 @@ public class IncomingConnectionFactory extends EventHandler
{
}
}
-
+
+ assert(_state == StateClosed);
+
+ threadPerIncomingConnectionFactory = _threadPerIncomingConnectionFactory;
+ _threadPerIncomingConnectionFactory = null;
+
//
- // We want to wait until all connections are finished
- // outside the thread synchronization.
+ // We want to wait until all connections are finished outside the
+ // thread synchronization.
//
connections = _connections;
_connections = new java.util.LinkedList();
}
-
- //
- // Now we wait for until the destruction of each connection is
- // finished.
- //
+
+ if(threadPerIncomingConnectionFactory != null &&
+ threadPerIncomingConnectionFactory != Thread.currentThread())
+ {
+ try
+ {
+ threadPerIncomingConnectionFactory.join();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
java.util.ListIterator p = connections.listIterator();
while(p.hasNext())
{
@@ -176,92 +190,101 @@ public class IncomingConnectionFactory extends EventHandler
public boolean
datagram()
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
return _endpoint.datagram();
}
public boolean
readable()
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
return false;
}
public void
read(BasicStream unused)
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
assert(false); // Must not be called.
}
public void
message(BasicStream unused, ThreadPool threadPool)
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
+
Ice.ConnectionI connection = null;
synchronized(this)
{
- if(_state != StateActive)
+ try
{
- Thread.yield();
- threadPool.promoteFollower();
- return;
- }
+ if(_state != StateActive)
+ {
+ Thread.yield();
+ return;
+ }
- //
- // Reap connections for which destruction has completed.
- //
- java.util.ListIterator p = _connections.listIterator();
- while(p.hasNext())
- {
- Ice.ConnectionI con = (Ice.ConnectionI)p.next();
- if(con.isFinished())
+ //
+ // Reap connections for which destruction has completed.
+ //
+ java.util.ListIterator p = _connections.listIterator();
+ while(p.hasNext())
{
- p.remove();
+ Ice.ConnectionI con = (Ice.ConnectionI)p.next();
+ if(con.isFinished())
+ {
+ p.remove();
+ }
}
- }
- //
- // Now accept a new connection.
- //
- Transceiver transceiver;
- try
- {
- transceiver = _acceptor.accept(0);
- }
- catch(Ice.TimeoutException ex)
- {
- // Ignore timeouts.
- return;
- }
- catch(Ice.LocalException ex)
- {
- // Warn about other Ice local exceptions.
- if(_warn)
+ //
+ // Now accept a new connection.
+ //
+ Transceiver transceiver;
+ try
{
- warning(ex);
+ transceiver = _acceptor.accept(0);
}
- return;
+ catch(Ice.TimeoutException ex)
+ {
+ // Ignore timeouts.
+ return;
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Warn about other Ice local exceptions.
+ if(_warn)
+ {
+ warning(ex);
+ }
+ return;
+ }
+
+ assert(transceiver != null);
+
+ //
+ // Create a connection object for the connection.
+ //
+ connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter);
+ _connections.add(connection);
}
finally
{
//
- // We must promote a follower after we accepted a new
- // connection, or after an exception.
+ // This makes sure that we promote a follower before we leave
+ // the scope of the mutex above, but after we call accept()
+ // (if we call it).
//
threadPool.promoteFollower();
}
-
- //
- // Create a connection object for the connection.
- //
- assert(transceiver != null);
- connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter);
- _connections.add(connection);
}
assert(connection != null);
//
- // We validate and activate outside the thread
- // synchronization, to not block the factory.
+ // We validate and activate outside the thread synchronization, to not block
+ // the factory.
//
try
{
@@ -282,6 +305,8 @@ public class IncomingConnectionFactory extends EventHandler
public synchronized void
finished(ThreadPool threadPool)
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
+
threadPool.promoteFollower();
if(_state == StateActive)
@@ -344,7 +369,18 @@ public class IncomingConnectionFactory extends EventHandler
{
_endpoint = h.value;
Ice.ConnectionI connection = new Ice.ConnectionI(_instance, _transceiver, _endpoint, _adapter);
- connection.validate();
+
+ //
+ // In thread per connection mode, the connection's thread will
+ // take care of connection validation, and we don't want to
+ // block here waiting until validation is complete. Therefore
+ // we don't call validate() in thread per connection mode.
+ //
+ if(!_instance.threadPerConnection())
+ {
+ connection.validate();
+ }
+
_connections.add(connection);
}
else
@@ -354,6 +390,40 @@ public class IncomingConnectionFactory extends EventHandler
_endpoint = h.value;
assert(_acceptor != null);
_acceptor.listen();
+
+ if(!_instance.threadPerConnection())
+ {
+ //
+ // Only set _threadPool if we really need it, i.e., if we are
+ // not in thread per connection mode. Thread pools have lazy
+ // initialization in Instance, and we don't want them to be
+ // created if they are not needed.
+ //
+ _threadPool = ((Ice.ObjectAdapterI)_adapter).getThreadPool();
+ }
+ else
+ {
+ //
+ // If we are in thread per connection mode, we also use
+ // one thread per incoming connection factory, that
+ // accepts new connections on this endpoint.
+ //
+ try
+ {
+ _threadPerIncomingConnectionFactory = new ThreadPerIncomingConnectionFactory();
+ _threadPerIncomingConnectionFactory.start();
+ }
+ catch(RuntimeException ex)
+ {
+ error("cannot create thread for incoming connection factory", ex);
+
+ _state = StateClosed;
+ _acceptor = null;
+ _threadPerIncomingConnectionFactory = null;
+
+ throw ex;
+ }
+ }
}
}
catch(RuntimeException ex)
@@ -371,6 +441,7 @@ public class IncomingConnectionFactory extends EventHandler
assert(_state == StateClosed);
assert(_acceptor == null);
assert(_connections.size() == 0);
+ assert(_threadPerIncomingConnectionFactory == null);
//
// Destroy the EventHandler's stream, so that its buffer
@@ -401,7 +472,10 @@ public class IncomingConnectionFactory extends EventHandler
{
return;
}
- registerWithPool();
+ if(!_instance.threadPerConnection())
+ {
+ registerWithPool();
+ }
java.util.ListIterator p = _connections.listIterator();
while(p.hasNext())
@@ -418,7 +492,10 @@ public class IncomingConnectionFactory extends EventHandler
{
return;
}
- unregisterWithPool();
+ if(!_instance.threadPerConnection())
+ {
+ unregisterWithPool();
+ }
java.util.ListIterator p = _connections.listIterator();
while(p.hasNext())
@@ -431,15 +508,26 @@ public class IncomingConnectionFactory extends EventHandler
case StateClosed:
{
- //
- // If we come from holding state, we first need to
- // register again before we unregister.
- //
- if(_state == StateHolding)
- {
- registerWithPool();
- }
- unregisterWithPool();
+ if(_instance.threadPerConnection())
+ {
+ //
+ // Connect to our own acceptor, which unblocks our
+ // thread per incoming connection factory stuck in accept().
+ //
+ _acceptor.connectToSelf();
+ }
+ else
+ {
+ //
+ // If we come from holding state, we first need to
+ // register again before we unregister.
+ //
+ if(_state == StateHolding)
+ {
+ registerWithPool();
+ }
+ unregisterWithPool();
+ }
java.util.ListIterator p = _connections.listIterator();
while(p.hasNext())
@@ -458,9 +546,11 @@ public class IncomingConnectionFactory extends EventHandler
private void
registerWithPool()
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
+
if(_acceptor != null && !_registeredWithPool)
{
- ((Ice.ObjectAdapterI)_adapter).getThreadPool()._register(_acceptor.fd(), this);
+ _threadPool._register(_acceptor.fd(), this);
_registeredWithPool = true;
}
}
@@ -468,9 +558,11 @@ public class IncomingConnectionFactory extends EventHandler
private void
unregisterWithPool()
{
+ assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
+
if(_acceptor != null && _registeredWithPool)
{
- ((Ice.ObjectAdapterI)_adapter).getThreadPool().unregister(_acceptor.fd());
+ _threadPool.unregister(_acceptor.fd());
_registeredWithPool = false;
}
}
@@ -486,14 +578,160 @@ public class IncomingConnectionFactory extends EventHandler
_instance.logger().warning(s);
}
+ private void
+ error(String msg, Exception ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = msg + ":\n" + toString() + sw.toString();
+ _instance.logger().error(s);
+ }
+
+ private void
+ run()
+ {
+ assert(_acceptor != null);
+
+ while(true)
+ {
+ //
+ // We must accept new connections outside the thread
+ // synchronization, because we use blocking accept.
+ //
+ Transceiver transceiver = null;
+ try
+ {
+ transceiver = _acceptor.accept(-1);
+ }
+ catch(Ice.SocketException ex)
+ {
+ // Ignore socket exceptions.
+ }
+ catch(Ice.TimeoutException ex)
+ {
+ // Ignore timeouts.
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Warn about other Ice local exceptions.
+ if(_warn)
+ {
+ warning(ex);
+ }
+ }
+
+ Ice.ConnectionI connection = null;
+
+ synchronized(this)
+ {
+ while(_state == StateHolding)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ if(_state == StateClosed)
+ {
+ if(transceiver != null)
+ {
+ try
+ {
+ transceiver.close();
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Here we ignore any exceptions in close().
+ }
+ }
+
+ try
+ {
+ _acceptor.close();
+ }
+ catch(Ice.LocalException ex)
+ {
+ _acceptor = null;
+ notifyAll();
+ throw ex;
+ }
+
+ _acceptor = null;
+ notifyAll();
+ return;
+ }
+
+ assert(_state == StateActive);
+
+ //
+ // Reap connections for which destruction has completed.
+ //
+ java.util.ListIterator p = _connections.listIterator();
+ while(p.hasNext())
+ {
+ Ice.ConnectionI con = (Ice.ConnectionI)p.next();
+ if(con.isFinished())
+ {
+ p.remove();
+ }
+ }
+
+ //
+ // Create a connection object for the connection.
+ //
+ // In Java a keyboard interrupt causes accept() to raise a
+ // SocketException, therefore transceiver may be null.
+ //
+ if(transceiver != null)
+ {
+ connection = new Ice.ConnectionI(_instance, transceiver, _endpoint, _adapter);
+ _connections.add(connection);
+ }
+ }
+
+ //
+ // In thread per connection mode, the connection's thread will
+ // take care of connection validation. We don't want to block
+ // this thread waiting until validation is complete, because
+ // in contrast to thread pool mode, it is the only thread that
+ // can accept connections with this factory's
+ // acceptor. Therefore we don't call validate() in thread per
+ // connection mode.
+ //
+ }
+ }
+
+ private class ThreadPerIncomingConnectionFactory extends Thread
+ {
+ public void
+ run()
+ {
+ try
+ {
+ IncomingConnectionFactory.this.run();
+ }
+ catch(Exception ex)
+ {
+ IncomingConnectionFactory.this.error("exception in thread per incoming connection factory", ex);
+ }
+ }
+ }
+ private Thread _threadPerIncomingConnectionFactory;
+
private Acceptor _acceptor;
private final Transceiver _transceiver;
private Endpoint _endpoint;
private final Ice.ObjectAdapter _adapter;
- private ThreadPool _serverThreadPool;
private boolean _registeredWithPool;
+ private ThreadPool _threadPool;
private final boolean _warn;
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index 6fb99599217..18ae8266bc9 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -196,6 +196,18 @@ public class Instance
return _serverThreadPool;
}
+ public boolean
+ threadPerConnection()
+ {
+ return _threadPerConnection;
+ }
+
+ public int
+ threadPerConnectionStackSize()
+ {
+ return _threadPerConnectionStackSize;
+ }
+
public synchronized EndpointFactoryManager
endpointFactoryManager()
{
@@ -355,7 +367,7 @@ public class Instance
_messageSizeMax = num * 1024; // Property is in kilobytes, _messageSizeMax in bytes
}
}
-
+
{
int num = _properties.getPropertyAsIntWithDefault("Ice.ConnectionIdleTime", 60);
if(num < 0)
@@ -367,7 +379,18 @@ public class Instance
_connectionIdleTime = num;
}
}
-
+
+ _threadPerConnection = _properties.getPropertyAsInt("Ice.ThreadPerConnection") > 0;
+
+ {
+ int stackSize = _properties.getPropertyAsInt("Ice.ThreadPerConnection.StackSize");
+ if(stackSize < 0)
+ {
+ stackSize = 0;
+ }
+ _threadPerConnectionStackSize = stackSize;
+ }
+
_routerManager = new RouterManager();
_locatorManager = new LocatorManager();
@@ -621,6 +644,8 @@ public class Instance
private ObjectAdapterFactory _objectAdapterFactory;
private ThreadPool _clientThreadPool;
private ThreadPool _serverThreadPool;
+ private final boolean _threadPerConnection;
+ private final int _threadPerConnectionStackSize;
private EndpointFactoryManager _endpointFactoryManager;
private Ice.PluginManager _pluginManager;
private final BufferManager _bufferManager; // Immutable, not reset by destroy().
diff --git a/java/src/IceInternal/Network.java b/java/src/IceInternal/Network.java
index 94577614268..eb58430452a 100644
--- a/java/src/IceInternal/Network.java
+++ b/java/src/IceInternal/Network.java
@@ -134,6 +134,21 @@ public final class Network
}
public static void
+ closeSocket(java.nio.channels.SelectableChannel fd)
+ {
+ try
+ {
+ fd.close();
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
+ }
+
+ public static void
setBlock(java.nio.channels.SelectableChannel fd, boolean block)
{
try
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 6e8bc384b24..270a3608a6c 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -490,16 +490,13 @@ public class OutgoingConnectionFactory
while(p.hasNext())
{
Ice.ConnectionI conn = (Ice.ConnectionI)p.next();
- if(conn.isValidated())
+ try
{
- try
- {
- conn.flushBatchRequests();
- }
- catch(Ice.LocalException ex)
- {
- // Ignore.
- }
+ conn.flushBatchRequests();
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Ignore.
}
}
}
diff --git a/java/src/IceInternal/TcpAcceptor.java b/java/src/IceInternal/TcpAcceptor.java
index ce564c76b1a..4142d4adc5d 100644
--- a/java/src/IceInternal/TcpAcceptor.java
+++ b/java/src/IceInternal/TcpAcceptor.java
@@ -68,6 +68,15 @@ class TcpAcceptor implements Acceptor
return new TcpTransceiver(_instance, fd);
}
+ public void
+ connectToSelf()
+ {
+ java.nio.channels.SocketChannel fd = Network.createTcpSocket();
+ Network.setBlock(fd, false);
+ Network.doConnect(fd, _addr, -1);
+ Network.closeSocket(fd);
+ }
+
public String
toString()
{
diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java
index 69213bc3750..16eec36d42b 100644
--- a/java/src/IceInternal/TcpTransceiver.java
+++ b/java/src/IceInternal/TcpTransceiver.java
@@ -43,11 +43,11 @@ final class TcpTransceiver implements Transceiver
}
public void
- shutdown()
+ shutdownWrite()
{
if(_traceLevels.network >= 2)
{
- String s = "shutting down tcp connection\n" + toString();
+ String s = "shutting down tcp connection for writing\n" + toString();
_logger.trace(_traceLevels.networkCat, s);
}
@@ -66,6 +66,38 @@ final class TcpTransceiver implements Transceiver
}
public void
+ shutdownReadWrite()
+ {
+ if(_traceLevels.network >= 2)
+ {
+ String s = "shutting down tcp connection for reading and writing\n" + toString();
+ _logger.trace(_traceLevels.networkCat, s);
+ }
+
+ assert(_fd != null);
+ java.net.Socket socket = _fd.socket();
+ try
+ {
+ //
+ // TODO: Java does not support SHUT_RDWR. Calling both
+ // shutdownInput and shutdownOutput results in an exception.
+ //
+ socket.shutdownInput(); // Shutdown socket for reading
+ //socket.shutdownOutput(); // Shutdown socket for writing
+ }
+ catch(java.net.SocketException ex)
+ {
+ // Ignore.
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
+ }
+
+ public void
write(BasicStream stream, int timeout)
{
java.nio.ByteBuffer buf = stream.prepareWrite();
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index f916ff256f5..1b83927d50f 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -33,6 +33,12 @@ public final class ThreadPool
_promote = true;
_warnUdp = _instance.properties().getPropertyAsInt("Ice.Warn.Datagrams") > 0;
+ //
+ // If we are in thread per connection mode, no thread pool should
+ // ever be created.
+ //
+ assert(!_instance.threadPerConnection());
+
String programName = _instance.properties().getProperty("Ice.ProgramName");
if(programName.length() > 0)
{
diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java
index ea2ddda7b30..602e4648704 100644
--- a/java/src/IceInternal/Transceiver.java
+++ b/java/src/IceInternal/Transceiver.java
@@ -13,7 +13,8 @@ public interface Transceiver
{
java.nio.channels.SelectableChannel fd();
void close();
- void shutdown();
+ void shutdownWrite();
+ void shutdownReadWrite();
void write(BasicStream stream, int timeout);
void read(BasicStream stream, int timeout);
String type();
diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java
index c77fb126cb7..aab3f4fbf08 100644
--- a/java/src/IceInternal/UdpTransceiver.java
+++ b/java/src/IceInternal/UdpTransceiver.java
@@ -39,7 +39,12 @@ final class UdpTransceiver implements Transceiver
}
public void
- shutdown()
+ shutdownWrite()
+ {
+ }
+
+ public void
+ shutdownReadWrite()
{
}
@@ -95,12 +100,11 @@ final class UdpTransceiver implements Transceiver
}
public void
- read(BasicStream stream, int timeout)
+ read(BasicStream stream, int timeout) // NOTE: timeout is ignored
{
- // TODO: Timeouts are ignored!!
+ assert(stream.pos() == 0);
- assert(stream.pos() == 0);
- final int packetSize = java.lang.Math.min(_maxPacketSize, _rcvSize - _udpOverhead);
+ final int packetSize = java.lang.Math.min(_maxPacketSize, _rcvSize - _udpOverhead);
if(packetSize < stream.size())
{
//
@@ -153,11 +157,15 @@ final class UdpTransceiver implements Transceiver
}
else
{
+ assert(_fd != null);
try
{
- assert(_fd != null);
_fd.receive(buf);
ret = buf.position();
+ if(ret == 0)
+ {
+ continue;
+ }
}
catch(java.io.InterruptedIOException ex)
{