diff options
author | Marc Laukien <marc@zeroc.com> | 2004-02-20 17:44:41 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-02-20 17:44:41 +0000 |
commit | f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad (patch) | |
tree | 520786ae72c4376b505f21f8adf9f5ea522cf9bf /java/src/IceInternal/Connection.java | |
parent | Win32 fixes (diff) | |
download | ice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.tar.bz2 ice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.tar.xz ice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.zip |
C++ -> Java
Diffstat (limited to 'java/src/IceInternal/Connection.java')
-rw-r--r-- | java/src/IceInternal/Connection.java | 741 |
1 files changed, 412 insertions, 329 deletions
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java index bce9f51a5dd..b9919b9adb9 100644 --- a/java/src/IceInternal/Connection.java +++ b/java/src/IceInternal/Connection.java @@ -19,15 +19,7 @@ public final class Connection extends EventHandler public synchronized void validate() { - if(_exception != null) - { - throw _exception; - } - - if(_state != StateNotValidated) - { - return; - } + assert(_state == StateNotValidated); if(!_endpoint.datagram()) // Datagram connections are always implicitly validated. { @@ -35,24 +27,27 @@ public final class Connection extends EventHandler { if(_adapter != null) { - // - // Incoming connections play the active role with - // respect to connection validation. - // - BasicStream os = new BasicStream(_instance); - os.writeByte(Protocol.magic[0]); - os.writeByte(Protocol.magic[1]); - os.writeByte(Protocol.magic[2]); - os.writeByte(Protocol.magic[3]); - os.writeByte(Protocol.protocolMajor); - os.writeByte(Protocol.protocolMinor); - os.writeByte(Protocol.encodingMajor); - os.writeByte(Protocol.encodingMinor); - os.writeByte(Protocol.validateConnectionMsg); - os.writeByte((byte)0); // Compression status. - os.writeInt(Protocol.headerSize); // Message size. - TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); + synchronized(_sendMutex) + { + // + // Incoming connections play the active role + // with respect to connection validation. + // + BasicStream os = new BasicStream(_instance); + os.writeByte(Protocol.magic[0]); + os.writeByte(Protocol.magic[1]); + os.writeByte(Protocol.magic[2]); + os.writeByte(Protocol.magic[3]); + os.writeByte(Protocol.protocolMajor); + os.writeByte(Protocol.protocolMinor); + os.writeByte(Protocol.encodingMajor); + os.writeByte(Protocol.encodingMinor); + os.writeByte(Protocol.validateConnectionMsg); + os.writeByte((byte)0); // Compression status. + os.writeInt(Protocol.headerSize); // Message size. + TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + } } else { @@ -158,24 +153,9 @@ public final class Connection extends EventHandler } } - // - // We only print warnings after successful connection validation. - // - _warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; - - // - // We only use active connection management after successful - // connection validation. We don't use active connection - // management for datagram connections at all, because such - // "virtual connections" cannot be reestablished. - // - if(!_endpoint.datagram()) + if(_acmTimeout > 0) { - _acmTimeout = _instance.properties().getPropertyAsInt("Ice.ConnectionIdleTime"); - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; - } + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; } // @@ -219,37 +199,21 @@ public final class Connection extends EventHandler } } - public boolean + public synchronized boolean isValidated() { - // - // No synchronization necessary, _state is declared - // volatile. Synchronization is not possible here anyway, - // because this function must not block. - // return _state > StateNotValidated; } - public boolean + public synchronized boolean isDestroyed() { - // - // No synchronization necessary, _state is declared - // volatile. Synchronization is not possible here anyway, - // because this function must not block. - // return _state >= StateClosing; } - public boolean + public synchronized boolean isFinished() { - // - // No synchronization necessary, _transceiver and - // _dispatchCount are declared volatile. Synchronization is - // not possible here anyway, because this function must not - // block. - // return _transceiver == null && _dispatchCount == 0; } @@ -317,8 +281,8 @@ public final class Connection extends EventHandler else { // - // We already waited long enough, so let's close this - // connection! + // We already waited long enough, so let's + // close this connection! // setState(StateClosed, new Ice.CloseTimeoutException()); } @@ -367,11 +331,14 @@ public final class Connection extends EventHandler // // Active connection management for idle connections. // - // TODO: Hack: ACM for incoming connections doesn't work right - // with AMI. - // - if(_acmTimeout > 0 && closingOK() && _adapter == null) + if(_acmTimeout > 0 && + _requests.isEmpty() && + _asyncRequests.isEmpty() && + !_batchStreamInUse && + _dispatchCount == 0) { + assert(_batchStream.isEmpty()); + if(System.currentTimeMillis() >= _acmAbsoluteTimeoutMillis) { setState(StateClosing, new Ice.ConnectionTimeoutException()); @@ -380,25 +347,6 @@ public final class Connection extends EventHandler } } - public synchronized void - incProxyCount() - { - assert(_proxyCount >= 0); - ++_proxyCount; - } - - public synchronized void - decProxyCount() - { - assert(_proxyCount > 0); - --_proxyCount; - - if(_proxyCount == 0 && _adapter == null && closingOK()) - { - setState(StateClosing, new Ice.CloseConnectionException()); - } - } - private final static byte[] _requestHdr = { Protocol.magic[0], @@ -415,122 +363,224 @@ public final class Connection extends EventHandler (byte)0, (byte)0, (byte)0, (byte)0 // Request ID (placeholder). }; + // + // TODO: Should not be a member function of Connection. + // public void prepareRequest(BasicStream os) { os.writeBlob(_requestHdr); } - public synchronized void - sendRequest(Outgoing out, boolean oneway) + public void + sendRequest(BasicStream os, Outgoing out) { - if(_exception != null) - { - throw _exception; - } - assert(_state > StateNotValidated); - assert(_state < StateClosing); - int requestId = 0; - - try + + synchronized(this) { - BasicStream os = out.os(); - os.pos(10); - + assert(!(out != null && _endpoint.datagram())); // Twoway requests cannot be datagrams. + + if(_exception != null) + { + throw _exception; + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + // - // Fill in the message size and request ID. + // Fill in the message size. // + os.pos(10); os.writeInt(os.size()); - if(!_endpoint.datagram() && !oneway) + + // + // Only add to the request map if this is a twoway call. + // + if(out != null) { + // + // Create a new unique request ID. + // requestId = _nextRequestId++; if(requestId <= 0) { _nextRequestId = 1; requestId = _nextRequestId++; } + + // + // Fill in the request ID. + // + os.pos(Protocol.headerSize); os.writeInt(requestId); + + // + // Add to the requests map. + // + _requests.put(requestId, out); + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; } - - // - // Send the request. - // - TraceUtil.traceRequest("sending request", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - assert(_exception != null); - throw _exception; } - - // - // Only add to the request map if there was no exception, and if - // the operation is not oneway. - // - if(!_endpoint.datagram() && !oneway) + + try { - _requests.put(requestId, out); + synchronized(_sendMutex) + { + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } + + // + // Send the request. + // + TraceUtil.traceRequest("sending request", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + } } - - if(_acmTimeout > 0) + catch(Ice.LocalException ex) { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + synchronized(this) + { + setState(StateClosed, ex); + assert(_exception != null); + + if(out != null) + { + // + // If the request has already been removed from + // the request map, we are out of luck. It would + // mean that finished() has been called already, + // and therefore the exception has been set using + // the Outgoing::finished() callback. In this + // case, we cannot throw the exception here, + // because we must not both raise an exception and + // have Outgoing::finished() called with an + // exception. This means that in some rare cases, + // a request will not be retried even though it + // could. But I honestly don't know how I could + // avoid this, without a very elaborate and + // complex design, which would be bad for + // performance. + // + Outgoing o = (Outgoing)_requests.remove(requestId); + if(o != null) + { + assert(o == out); + throw _exception; + } + } + else + { + throw _exception; + } + } } } - public synchronized void - sendAsyncRequest(OutgoingAsync out) + public void + sendAsyncRequest(BasicStream os, OutgoingAsync out) { - if(_exception != null) - { - throw _exception; - } - assert(_state > StateNotValidated); - assert(_state < StateClosing); - int requestId = 0; - - try + + synchronized(this) { - BasicStream os = out.__os(); - os.pos(10); - + assert(!_endpoint.datagram()); // Twoway requests cannot be datagrams, and async implies twoway. + + if(_exception != null) + { + throw _exception; + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + // - // Fill in the message size and request ID. + // Fill in the message size. // + os.pos(10); os.writeInt(os.size()); + + // + // Create a new unique request ID. + // requestId = _nextRequestId++; if(requestId <= 0) { _nextRequestId = 1; requestId = _nextRequestId++; } + + // + // Fill in the request ID. + // + os.pos(Protocol.headerSize); os.writeInt(requestId); // - // Send the request. + // Add to the requests map. // - TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); + _asyncRequests.put(requestId, out); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } } - catch(Ice.LocalException ex) + + try { - setState(StateClosed, ex); - assert(_exception != null); - throw _exception; + synchronized(_sendMutex) + { + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } + + // + // Send the request. + // + TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + } } - - // - // Only add to the request map if there was no exception. - // - _asyncRequests.put(requestId, out); - - if(_acmTimeout > 0) + catch(Ice.LocalException ex) { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + synchronized(this) + { + setState(StateClosed, ex); + assert(_exception != null); + + // + // If the request has already been removed from the + // async request map, we are out of luck. It would + // mean that finished() has been called already, and + // therefore the exception has been set using the + // OutgoingAsync::__finished() callback. In this case, + // we cannot throw the exception here, because we must + // not both raise an exception and have + // OutgoingAsync::__finished() called with an + // exception. This means that in some rare cases, a + // request will not be retried even though it + // could. But I honestly don't know how I could avoid + // this, without a very elaborate and complex design, + // which would be bad for performance. + // + OutgoingAsync o = (OutgoingAsync)_asyncRequests.remove(requestId); + if(o != null) + { + assert(o == out); + throw _exception; + } + } } } @@ -568,6 +618,7 @@ public final class Connection extends EventHandler { throw _exception; } + assert(_state > StateNotValidated); assert(_state < StateClosing); @@ -589,7 +640,7 @@ public final class Connection extends EventHandler // // _batchStream now belongs to the caller, until - // finishBatchRequest() or abortBatchRequest() is called. + // finishBatchRequest() is called. // } @@ -600,6 +651,7 @@ public final class Connection extends EventHandler { throw _exception; } + assert(_state > StateNotValidated); assert(_state < StateClosing); @@ -614,135 +666,170 @@ public final class Connection extends EventHandler notifyAll(); } - public synchronized void - abortBatchRequest() - { - setState(StateClosed, new Ice.AbortBatchRequestException()); - - // - // Give the Connection back. - // - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); - } - - public synchronized void + public void flushBatchRequest() { - while(_batchStreamInUse && _exception == null) + synchronized(this) { - try + while(_batchStreamInUse && _exception == null) { - wait(); + try + { + wait(); + } + catch(InterruptedException ex) + { + } } - catch(InterruptedException ex) + + if(_exception != null) + { + throw _exception; + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + if(_batchStream.isEmpty()) { + return; // Nothing to do. } - } - if(_exception != null) - { - throw _exception; + // + // Fill in the message size. + // + _batchStream.pos(10); + _batchStream.writeInt(_batchStream.size()); + + // + // Fill in the number of requests in the batch. + // + _batchStream.writeInt(_batchRequestNum); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + + // + // Prevent that new batch requests are added while we are + // flushing. + // + _batchStreamInUse = true; } - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - if(!_batchStream.isEmpty()) + + try { - try + synchronized(_sendMutex) { - _batchStream.pos(10); - - // - // Fill in the message size. - // - _batchStream.writeInt(_batchStream.size()); - - // - // Fill in the number of requests in the batch. - // - _batchStream.writeInt(_batchRequestNum); + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } // // Send the batch request. // TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); _transceiver.write(_batchStream, _endpoint.timeout()); - - // - // Reset _batchStream so that new batch messages can be sent. - // - _batchStream.destroy(); - _batchStream = new BasicStream(_instance); - _batchRequestNum = 0; } - catch(Ice.LocalException ex) + } + catch(Ice.LocalException ex) + { + synchronized(this) { setState(StateClosed, ex); assert(_exception != null); + + // + // Since batch requests are all oneways (or datagrams), we + // must report the exception to the caller. + // throw _exception; } - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; - } } - if(_proxyCount == 0 && _adapter == null && closingOK()) + synchronized(this) { - setState(StateClosing, new Ice.CloseConnectionException()); + // + // Reset the batch stream, and notify that flushing is over. + // + _batchStream.destroy(); + _batchStream = new BasicStream(_instance); + _batchRequestNum = 0; + _batchStreamInUse = false; + notifyAll(); } } - public synchronized void + public void sendResponse(BasicStream os, byte compress) { try { - if(--_dispatchCount == 0) + synchronized(_sendMutex) { - notifyAll(); - } - - if(_state == StateClosed) - { - return; - } + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } - // - // Fill in the message size. - // - os.pos(10); - final int sz = os.size(); - os.writeInt(sz); - - // - // Send the reply. - // - TraceUtil.traceReply("sending reply", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - - if(_state == StateClosing && _dispatchCount == 0) - { - initiateShutdown(); + // + // Fill in the message size. + // + os.pos(10); + os.writeInt(os.size()); + + // + // Send the reply. + // + TraceUtil.traceReply("sending reply", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); } } catch(Ice.LocalException ex) { - setState(StateClosed, ex); + synchronized(this) + { + setState(StateClosed, ex); + } } - if(_acmTimeout > 0) + synchronized(this) { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + assert(_state > StateNotValidated); + + try + { + if(--_dispatchCount == 0) + { + notifyAll(); + } + + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + } + catch(Ice.LocalException ex) + { + setState(StateClosed, ex); + } } } public synchronized void sendNoResponse() { + assert(_state > StateNotValidated); + try { if(--_dispatchCount == 0) @@ -750,11 +837,6 @@ public final class Connection extends EventHandler notifyAll(); } - if(_state == StateClosed) - { - return; - } - if(_state == StateClosing && _dispatchCount == 0) { initiateShutdown(); @@ -914,8 +996,7 @@ public final class Connection extends EventHandler { if(_warn) { - _logger.warning("ignoring close connection message for datagram connection:\n" + - _transceiver.toString()); + _logger.warning("ignoring close connection message for datagram connection:\n" + _desc); } } else @@ -946,6 +1027,8 @@ public final class Connection extends EventHandler synchronized(this) { + assert(_state > StateNotValidated); + if(_state == StateClosed) { return; @@ -1015,11 +1098,6 @@ public final class Connection extends EventHandler { throw new Ice.UnknownRequestIdException(); } - - if(_proxyCount == 0 && _adapter == null && closingOK()) - { - setState(StateClosing, new Ice.CloseConnectionException()); - } } break; } @@ -1029,8 +1107,7 @@ public final class Connection extends EventHandler TraceUtil.traceHeader("received validate connection", stream, _logger, _traceLevels); if(_warn) { - _logger.warning("ignoring unexpected validate connection message:\n" + - _transceiver.toString()); + _logger.warning("ignoring unexpected validate connection message:\n" + _desc); } break; } @@ -1148,7 +1225,7 @@ public final class Connection extends EventHandler { threadPool.promoteFollower(); - Ice.LocalException closeException = null; + Ice.LocalException exception = null; IntMap requests = null; IntMap asyncRequests = null; @@ -1159,20 +1236,26 @@ public final class Connection extends EventHandler { registerWithPool(); } - else if(_state == StateClosed && _transceiver != null) + else if(_state == StateClosed) { - try - { - _transceiver.close(); - } - catch(Ice.LocalException ex) + // + // We must make sure that nobody is sending when we + // close the transceiver. + // + synchronized(_sendMutex) { - closeException = ex; + try + { + _transceiver.close(); + } + catch(Ice.LocalException ex) + { + exception = ex; + } + + _transceiver = null; + notifyAll(); } - - _transceiver = null; - _threadPool = null; // We don't need the thread pool anymore. - notifyAll(); } if(_state == StateClosed || _state == StateClosing) @@ -1207,9 +1290,9 @@ public final class Connection extends EventHandler } } - if(closeException != null) + if(exception != null) { - throw closeException; + throw exception; } } @@ -1219,31 +1302,30 @@ public final class Connection extends EventHandler setState(StateClosed, ex); } - public synchronized String + public String toString() { - assert(_transceiver != null); - return _transceiver.toString(); + return _desc; // No mutex lock, _desc is immutable. } Connection(Instance instance, Transceiver transceiver, Endpoint endpoint, Ice.ObjectAdapter adapter) { super(instance); _transceiver = transceiver; + _desc = transceiver.toString(); _endpoint = endpoint; _adapter = adapter; - _logger = instance.logger(); // Cached for better performance. - _traceLevels = instance.traceLevels(); // Cached for better performance. + _logger = instance.logger(); // Cached for better performance. + _traceLevels = instance.traceLevels(); // Cached for better performance. _registeredWithPool = false; - _warn = false; - _acmTimeout = 0; + _warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; + _acmTimeout = _endpoint.datagram() ? 0 : _instance.connectionIdleTime(); _acmAbsoluteTimeoutMillis = 0; _nextRequestId = 1; _batchStream = new BasicStream(instance); _batchStreamInUse = false; _batchRequestNum = 0; _dispatchCount = 0; - _proxyCount = 0; _state = StateNotValidated; _stateTime = System.currentTimeMillis(); @@ -1266,7 +1348,6 @@ public final class Connection extends EventHandler assert(_state == StateClosed); assert(_transceiver == null); assert(_dispatchCount == 0); - assert(_proxyCount == 0); assert(_incomingCache == null); _batchStream.destroy(); @@ -1400,17 +1481,24 @@ public final class Connection extends EventHandler { assert(!_registeredWithPool); - try - { - _transceiver.close(); - } - catch(Ice.LocalException ex) + // + // We must make sure that nobody is sending when + // we close the transceiver. + // + synchronized(_sendMutex) { - // Here we ignore any exceptions in close(). - } + try + { + _transceiver.close(); + } + catch(Ice.LocalException ex) + { + // Here we ignore any exceptions in close(). + } - _transceiver = null; - _threadPool = null; // We don't need the thread pool anymore. + _transceiver = null; + //notifyAll(); // We notify already below. + } } else { @@ -1446,29 +1534,32 @@ public final class Connection extends EventHandler if(!_endpoint.datagram()) { - // - // Before we shut down, we send a close connection - // message. - // - BasicStream os = new BasicStream(_instance); - os.writeByte(Protocol.magic[0]); - os.writeByte(Protocol.magic[1]); - os.writeByte(Protocol.magic[2]); - os.writeByte(Protocol.magic[3]); - os.writeByte(Protocol.protocolMajor); - os.writeByte(Protocol.protocolMinor); - os.writeByte(Protocol.encodingMajor); - os.writeByte(Protocol.encodingMinor); - os.writeByte(Protocol.closeConnectionMsg); - os.writeByte((byte)0); // Compression status. - os.writeInt(Protocol.headerSize); // Message size. - - // - // Send the message. - // - TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - _transceiver.shutdown(); + synchronized(_sendMutex) + { + // + // Before we shut down, we send a close connection + // message. + // + BasicStream os = new BasicStream(_instance); + os.writeByte(Protocol.magic[0]); + os.writeByte(Protocol.magic[1]); + os.writeByte(Protocol.magic[2]); + os.writeByte(Protocol.magic[3]); + os.writeByte(Protocol.protocolMajor); + os.writeByte(Protocol.protocolMinor); + os.writeByte(Protocol.encodingMajor); + os.writeByte(Protocol.encodingMinor); + os.writeByte(Protocol.closeConnectionMsg); + os.writeByte((byte)0); // Compression status. + os.writeInt(Protocol.headerSize); // Message size. + + // + // Send the message. + // + TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + _transceiver.shutdown(); + } } } @@ -1477,7 +1568,6 @@ public final class Connection extends EventHandler { if(!_registeredWithPool) { - assert(_threadPool != null); _threadPool._register(_transceiver.fd(), this); _registeredWithPool = true; @@ -1494,7 +1584,6 @@ public final class Connection extends EventHandler { if(_registeredWithPool) { - assert(_threadPool != null); _threadPool.unregister(_transceiver.fd()); _registeredWithPool = false; @@ -1523,7 +1612,7 @@ public final class Connection extends EventHandler } while(t != null); pw.flush(); - String s = msg + ":\n" + sw.toString() + _transceiver.toString(); + String s = msg + ":\n" + sw.toString() + _desc; _logger.warning(s); } @@ -1581,18 +1670,8 @@ public final class Connection extends EventHandler } */ - private boolean - closingOK() - { - return - _requests.isEmpty() && - _asyncRequests.isEmpty() && - !_batchStreamInUse && - _batchStream.isEmpty() && - _dispatchCount == 0; - } - - private volatile Transceiver _transceiver; // Must be volatile, see comment in isFinished(). + private Transceiver _transceiver; + private final String _desc; private final Endpoint _endpoint; private Ice.ObjectAdapter _adapter; @@ -1602,11 +1681,11 @@ public final class Connection extends EventHandler private final TraceLevels _traceLevels; private boolean _registeredWithPool; - private ThreadPool _threadPool; + private final ThreadPool _threadPool; - private boolean _warn; + private final boolean _warn; - private int _acmTimeout; + private final int _acmTimeout; private long _acmAbsoluteTimeoutMillis; private int _nextRequestId; @@ -1619,12 +1698,16 @@ public final class Connection extends EventHandler private boolean _batchStreamInUse; private int _batchRequestNum; - private volatile int _dispatchCount; // Must be volatile, see comment in isDestroyed(). + private int _dispatchCount; - private int _proxyCount; + private int _state; // The current state. + private long _stateTime; // The last time when the state was changed. - private volatile int _state; // The current state. Must be volatile, see comment in isDestroyed(). - private long _stateTime; // The time when the state was changed the last time. + // + // We have a separate mutex for sending, so that we don't block + // the whole connection when we do a blocking send. + // + private java.lang.Object _sendMutex = new java.lang.Object(); private Incoming _incomingCache; private java.lang.Object _incomingCacheMutex = new java.lang.Object(); |