summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/Connection.java
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2004-02-20 17:44:41 +0000
committerMarc Laukien <marc@zeroc.com>2004-02-20 17:44:41 +0000
commitf86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad (patch)
tree520786ae72c4376b505f21f8adf9f5ea522cf9bf /java/src/IceInternal/Connection.java
parentWin32 fixes (diff)
downloadice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.tar.bz2
ice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.tar.xz
ice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.zip
C++ -> Java
Diffstat (limited to 'java/src/IceInternal/Connection.java')
-rw-r--r--java/src/IceInternal/Connection.java741
1 files changed, 412 insertions, 329 deletions
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java
index bce9f51a5dd..b9919b9adb9 100644
--- a/java/src/IceInternal/Connection.java
+++ b/java/src/IceInternal/Connection.java
@@ -19,15 +19,7 @@ public final class Connection extends EventHandler
public synchronized void
validate()
{
- if(_exception != null)
- {
- throw _exception;
- }
-
- if(_state != StateNotValidated)
- {
- return;
- }
+ assert(_state == StateNotValidated);
if(!_endpoint.datagram()) // Datagram connections are always implicitly validated.
{
@@ -35,24 +27,27 @@ public final class Connection extends EventHandler
{
if(_adapter != null)
{
- //
- // Incoming connections play the active role with
- // respect to connection validation.
- //
- BasicStream os = new BasicStream(_instance);
- os.writeByte(Protocol.magic[0]);
- os.writeByte(Protocol.magic[1]);
- os.writeByte(Protocol.magic[2]);
- os.writeByte(Protocol.magic[3]);
- os.writeByte(Protocol.protocolMajor);
- os.writeByte(Protocol.protocolMinor);
- os.writeByte(Protocol.encodingMajor);
- os.writeByte(Protocol.encodingMinor);
- os.writeByte(Protocol.validateConnectionMsg);
- os.writeByte((byte)0); // Compression status.
- os.writeInt(Protocol.headerSize); // Message size.
- TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
+ synchronized(_sendMutex)
+ {
+ //
+ // Incoming connections play the active role
+ // with respect to connection validation.
+ //
+ BasicStream os = new BasicStream(_instance);
+ os.writeByte(Protocol.magic[0]);
+ os.writeByte(Protocol.magic[1]);
+ os.writeByte(Protocol.magic[2]);
+ os.writeByte(Protocol.magic[3]);
+ os.writeByte(Protocol.protocolMajor);
+ os.writeByte(Protocol.protocolMinor);
+ os.writeByte(Protocol.encodingMajor);
+ os.writeByte(Protocol.encodingMinor);
+ os.writeByte(Protocol.validateConnectionMsg);
+ os.writeByte((byte)0); // Compression status.
+ os.writeInt(Protocol.headerSize); // Message size.
+ TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+ }
}
else
{
@@ -158,24 +153,9 @@ public final class Connection extends EventHandler
}
}
- //
- // We only print warnings after successful connection validation.
- //
- _warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
-
- //
- // We only use active connection management after successful
- // connection validation. We don't use active connection
- // management for datagram connections at all, because such
- // "virtual connections" cannot be reestablished.
- //
- if(!_endpoint.datagram())
+ if(_acmTimeout > 0)
{
- _acmTimeout = _instance.properties().getPropertyAsInt("Ice.ConnectionIdleTime");
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
- }
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
}
//
@@ -219,37 +199,21 @@ public final class Connection extends EventHandler
}
}
- public boolean
+ public synchronized boolean
isValidated()
{
- //
- // No synchronization necessary, _state is declared
- // volatile. Synchronization is not possible here anyway,
- // because this function must not block.
- //
return _state > StateNotValidated;
}
- public boolean
+ public synchronized boolean
isDestroyed()
{
- //
- // No synchronization necessary, _state is declared
- // volatile. Synchronization is not possible here anyway,
- // because this function must not block.
- //
return _state >= StateClosing;
}
- public boolean
+ public synchronized boolean
isFinished()
{
- //
- // No synchronization necessary, _transceiver and
- // _dispatchCount are declared volatile. Synchronization is
- // not possible here anyway, because this function must not
- // block.
- //
return _transceiver == null && _dispatchCount == 0;
}
@@ -317,8 +281,8 @@ public final class Connection extends EventHandler
else
{
//
- // We already waited long enough, so let's close this
- // connection!
+ // We already waited long enough, so let's
+ // close this connection!
//
setState(StateClosed, new Ice.CloseTimeoutException());
}
@@ -367,11 +331,14 @@ public final class Connection extends EventHandler
//
// Active connection management for idle connections.
//
- // TODO: Hack: ACM for incoming connections doesn't work right
- // with AMI.
- //
- if(_acmTimeout > 0 && closingOK() && _adapter == null)
+ if(_acmTimeout > 0 &&
+ _requests.isEmpty() &&
+ _asyncRequests.isEmpty() &&
+ !_batchStreamInUse &&
+ _dispatchCount == 0)
{
+ assert(_batchStream.isEmpty());
+
if(System.currentTimeMillis() >= _acmAbsoluteTimeoutMillis)
{
setState(StateClosing, new Ice.ConnectionTimeoutException());
@@ -380,25 +347,6 @@ public final class Connection extends EventHandler
}
}
- public synchronized void
- incProxyCount()
- {
- assert(_proxyCount >= 0);
- ++_proxyCount;
- }
-
- public synchronized void
- decProxyCount()
- {
- assert(_proxyCount > 0);
- --_proxyCount;
-
- if(_proxyCount == 0 && _adapter == null && closingOK())
- {
- setState(StateClosing, new Ice.CloseConnectionException());
- }
- }
-
private final static byte[] _requestHdr =
{
Protocol.magic[0],
@@ -415,122 +363,224 @@ public final class Connection extends EventHandler
(byte)0, (byte)0, (byte)0, (byte)0 // Request ID (placeholder).
};
+ //
+ // TODO: Should not be a member function of Connection.
+ //
public void
prepareRequest(BasicStream os)
{
os.writeBlob(_requestHdr);
}
- public synchronized void
- sendRequest(Outgoing out, boolean oneway)
+ public void
+ sendRequest(BasicStream os, Outgoing out)
{
- if(_exception != null)
- {
- throw _exception;
- }
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
int requestId = 0;
-
- try
+
+ synchronized(this)
{
- BasicStream os = out.os();
- os.pos(10);
-
+ assert(!(out != null && _endpoint.datagram())); // Twoway requests cannot be datagrams.
+
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
//
- // Fill in the message size and request ID.
+ // Fill in the message size.
//
+ os.pos(10);
os.writeInt(os.size());
- if(!_endpoint.datagram() && !oneway)
+
+ //
+ // Only add to the request map if this is a twoway call.
+ //
+ if(out != null)
{
+ //
+ // Create a new unique request ID.
+ //
requestId = _nextRequestId++;
if(requestId <= 0)
{
_nextRequestId = 1;
requestId = _nextRequestId++;
}
+
+ //
+ // Fill in the request ID.
+ //
+ os.pos(Protocol.headerSize);
os.writeInt(requestId);
+
+ //
+ // Add to the requests map.
+ //
+ _requests.put(requestId, out);
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
}
-
- //
- // Send the request.
- //
- TraceUtil.traceRequest("sending request", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert(_exception != null);
- throw _exception;
}
-
- //
- // Only add to the request map if there was no exception, and if
- // the operation is not oneway.
- //
- if(!_endpoint.datagram() && !oneway)
+
+ try
{
- _requests.put(requestId, out);
+ synchronized(_sendMutex)
+ {
+ if(_transceiver == null) // Has the transceiver already been closed?
+ {
+ assert(_exception != null);
+ throw _exception; // The exception is immutable at this point.
+ }
+
+ //
+ // Send the request.
+ //
+ TraceUtil.traceRequest("sending request", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+ }
}
-
- if(_acmTimeout > 0)
+ catch(Ice.LocalException ex)
{
- _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ synchronized(this)
+ {
+ setState(StateClosed, ex);
+ assert(_exception != null);
+
+ if(out != null)
+ {
+ //
+ // If the request has already been removed from
+ // the request map, we are out of luck. It would
+ // mean that finished() has been called already,
+ // and therefore the exception has been set using
+ // the Outgoing::finished() callback. In this
+ // case, we cannot throw the exception here,
+ // because we must not both raise an exception and
+ // have Outgoing::finished() called with an
+ // exception. This means that in some rare cases,
+ // a request will not be retried even though it
+ // could. But I honestly don't know how I could
+ // avoid this, without a very elaborate and
+ // complex design, which would be bad for
+ // performance.
+ //
+ Outgoing o = (Outgoing)_requests.remove(requestId);
+ if(o != null)
+ {
+ assert(o == out);
+ throw _exception;
+ }
+ }
+ else
+ {
+ throw _exception;
+ }
+ }
}
}
- public synchronized void
- sendAsyncRequest(OutgoingAsync out)
+ public void
+ sendAsyncRequest(BasicStream os, OutgoingAsync out)
{
- if(_exception != null)
- {
- throw _exception;
- }
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
int requestId = 0;
-
- try
+
+ synchronized(this)
{
- BasicStream os = out.__os();
- os.pos(10);
-
+ assert(!_endpoint.datagram()); // Twoway requests cannot be datagrams, and async implies twoway.
+
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
//
- // Fill in the message size and request ID.
+ // Fill in the message size.
//
+ os.pos(10);
os.writeInt(os.size());
+
+ //
+ // Create a new unique request ID.
+ //
requestId = _nextRequestId++;
if(requestId <= 0)
{
_nextRequestId = 1;
requestId = _nextRequestId++;
}
+
+ //
+ // Fill in the request ID.
+ //
+ os.pos(Protocol.headerSize);
os.writeInt(requestId);
//
- // Send the request.
+ // Add to the requests map.
//
- TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
+ _asyncRequests.put(requestId, out);
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
}
- catch(Ice.LocalException ex)
+
+ try
{
- setState(StateClosed, ex);
- assert(_exception != null);
- throw _exception;
+ synchronized(_sendMutex)
+ {
+ if(_transceiver == null) // Has the transceiver already been closed?
+ {
+ assert(_exception != null);
+ throw _exception; // The exception is immutable at this point.
+ }
+
+ //
+ // Send the request.
+ //
+ TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+ }
}
-
- //
- // Only add to the request map if there was no exception.
- //
- _asyncRequests.put(requestId, out);
-
- if(_acmTimeout > 0)
+ catch(Ice.LocalException ex)
{
- _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ synchronized(this)
+ {
+ setState(StateClosed, ex);
+ assert(_exception != null);
+
+ //
+ // If the request has already been removed from the
+ // async request map, we are out of luck. It would
+ // mean that finished() has been called already, and
+ // therefore the exception has been set using the
+ // OutgoingAsync::__finished() callback. In this case,
+ // we cannot throw the exception here, because we must
+ // not both raise an exception and have
+ // OutgoingAsync::__finished() called with an
+ // exception. This means that in some rare cases, a
+ // request will not be retried even though it
+ // could. But I honestly don't know how I could avoid
+ // this, without a very elaborate and complex design,
+ // which would be bad for performance.
+ //
+ OutgoingAsync o = (OutgoingAsync)_asyncRequests.remove(requestId);
+ if(o != null)
+ {
+ assert(o == out);
+ throw _exception;
+ }
+ }
}
}
@@ -568,6 +618,7 @@ public final class Connection extends EventHandler
{
throw _exception;
}
+
assert(_state > StateNotValidated);
assert(_state < StateClosing);
@@ -589,7 +640,7 @@ public final class Connection extends EventHandler
//
// _batchStream now belongs to the caller, until
- // finishBatchRequest() or abortBatchRequest() is called.
+ // finishBatchRequest() is called.
//
}
@@ -600,6 +651,7 @@ public final class Connection extends EventHandler
{
throw _exception;
}
+
assert(_state > StateNotValidated);
assert(_state < StateClosing);
@@ -614,135 +666,170 @@ public final class Connection extends EventHandler
notifyAll();
}
- public synchronized void
- abortBatchRequest()
- {
- setState(StateClosed, new Ice.AbortBatchRequestException());
-
- //
- // Give the Connection back.
- //
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
- }
-
- public synchronized void
+ public void
flushBatchRequest()
{
- while(_batchStreamInUse && _exception == null)
+ synchronized(this)
{
- try
+ while(_batchStreamInUse && _exception == null)
{
- wait();
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
}
- catch(InterruptedException ex)
+
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ if(_batchStream.isEmpty())
{
+ return; // Nothing to do.
}
- }
- if(_exception != null)
- {
- throw _exception;
+ //
+ // Fill in the message size.
+ //
+ _batchStream.pos(10);
+ _batchStream.writeInt(_batchStream.size());
+
+ //
+ // Fill in the number of requests in the batch.
+ //
+ _batchStream.writeInt(_batchRequestNum);
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
+
+ //
+ // Prevent that new batch requests are added while we are
+ // flushing.
+ //
+ _batchStreamInUse = true;
}
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- if(!_batchStream.isEmpty())
+
+ try
{
- try
+ synchronized(_sendMutex)
{
- _batchStream.pos(10);
-
- //
- // Fill in the message size.
- //
- _batchStream.writeInt(_batchStream.size());
-
- //
- // Fill in the number of requests in the batch.
- //
- _batchStream.writeInt(_batchRequestNum);
+ if(_transceiver == null) // Has the transceiver already been closed?
+ {
+ assert(_exception != null);
+ throw _exception; // The exception is immutable at this point.
+ }
//
// Send the batch request.
//
TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
_transceiver.write(_batchStream, _endpoint.timeout());
-
- //
- // Reset _batchStream so that new batch messages can be sent.
- //
- _batchStream.destroy();
- _batchStream = new BasicStream(_instance);
- _batchRequestNum = 0;
}
- catch(Ice.LocalException ex)
+ }
+ catch(Ice.LocalException ex)
+ {
+ synchronized(this)
{
setState(StateClosed, ex);
assert(_exception != null);
+
+ //
+ // Since batch requests are all oneways (or datagrams), we
+ // must report the exception to the caller.
+ //
throw _exception;
}
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
- }
}
- if(_proxyCount == 0 && _adapter == null && closingOK())
+ synchronized(this)
{
- setState(StateClosing, new Ice.CloseConnectionException());
+ //
+ // Reset the batch stream, and notify that flushing is over.
+ //
+ _batchStream.destroy();
+ _batchStream = new BasicStream(_instance);
+ _batchRequestNum = 0;
+ _batchStreamInUse = false;
+ notifyAll();
}
}
- public synchronized void
+ public void
sendResponse(BasicStream os, byte compress)
{
try
{
- if(--_dispatchCount == 0)
+ synchronized(_sendMutex)
{
- notifyAll();
- }
-
- if(_state == StateClosed)
- {
- return;
- }
+ if(_transceiver == null) // Has the transceiver already been closed?
+ {
+ assert(_exception != null);
+ throw _exception; // The exception is immutable at this point.
+ }
- //
- // Fill in the message size.
- //
- os.pos(10);
- final int sz = os.size();
- os.writeInt(sz);
-
- //
- // Send the reply.
- //
- TraceUtil.traceReply("sending reply", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
-
- if(_state == StateClosing && _dispatchCount == 0)
- {
- initiateShutdown();
+ //
+ // Fill in the message size.
+ //
+ os.pos(10);
+ os.writeInt(os.size());
+
+ //
+ // Send the reply.
+ //
+ TraceUtil.traceReply("sending reply", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
}
}
catch(Ice.LocalException ex)
{
- setState(StateClosed, ex);
+ synchronized(this)
+ {
+ setState(StateClosed, ex);
+ }
}
- if(_acmTimeout > 0)
+ synchronized(this)
{
- _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ assert(_state > StateNotValidated);
+
+ try
+ {
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
}
}
public synchronized void
sendNoResponse()
{
+ assert(_state > StateNotValidated);
+
try
{
if(--_dispatchCount == 0)
@@ -750,11 +837,6 @@ public final class Connection extends EventHandler
notifyAll();
}
- if(_state == StateClosed)
- {
- return;
- }
-
if(_state == StateClosing && _dispatchCount == 0)
{
initiateShutdown();
@@ -914,8 +996,7 @@ public final class Connection extends EventHandler
{
if(_warn)
{
- _logger.warning("ignoring close connection message for datagram connection:\n" +
- _transceiver.toString());
+ _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
}
}
else
@@ -946,6 +1027,8 @@ public final class Connection extends EventHandler
synchronized(this)
{
+ assert(_state > StateNotValidated);
+
if(_state == StateClosed)
{
return;
@@ -1015,11 +1098,6 @@ public final class Connection extends EventHandler
{
throw new Ice.UnknownRequestIdException();
}
-
- if(_proxyCount == 0 && _adapter == null && closingOK())
- {
- setState(StateClosing, new Ice.CloseConnectionException());
- }
}
break;
}
@@ -1029,8 +1107,7 @@ public final class Connection extends EventHandler
TraceUtil.traceHeader("received validate connection", stream, _logger, _traceLevels);
if(_warn)
{
- _logger.warning("ignoring unexpected validate connection message:\n" +
- _transceiver.toString());
+ _logger.warning("ignoring unexpected validate connection message:\n" + _desc);
}
break;
}
@@ -1148,7 +1225,7 @@ public final class Connection extends EventHandler
{
threadPool.promoteFollower();
- Ice.LocalException closeException = null;
+ Ice.LocalException exception = null;
IntMap requests = null;
IntMap asyncRequests = null;
@@ -1159,20 +1236,26 @@ public final class Connection extends EventHandler
{
registerWithPool();
}
- else if(_state == StateClosed && _transceiver != null)
+ else if(_state == StateClosed)
{
- try
- {
- _transceiver.close();
- }
- catch(Ice.LocalException ex)
+ //
+ // We must make sure that nobody is sending when we
+ // close the transceiver.
+ //
+ synchronized(_sendMutex)
{
- closeException = ex;
+ try
+ {
+ _transceiver.close();
+ }
+ catch(Ice.LocalException ex)
+ {
+ exception = ex;
+ }
+
+ _transceiver = null;
+ notifyAll();
}
-
- _transceiver = null;
- _threadPool = null; // We don't need the thread pool anymore.
- notifyAll();
}
if(_state == StateClosed || _state == StateClosing)
@@ -1207,9 +1290,9 @@ public final class Connection extends EventHandler
}
}
- if(closeException != null)
+ if(exception != null)
{
- throw closeException;
+ throw exception;
}
}
@@ -1219,31 +1302,30 @@ public final class Connection extends EventHandler
setState(StateClosed, ex);
}
- public synchronized String
+ public String
toString()
{
- assert(_transceiver != null);
- return _transceiver.toString();
+ return _desc; // No mutex lock, _desc is immutable.
}
Connection(Instance instance, Transceiver transceiver, Endpoint endpoint, Ice.ObjectAdapter adapter)
{
super(instance);
_transceiver = transceiver;
+ _desc = transceiver.toString();
_endpoint = endpoint;
_adapter = adapter;
- _logger = instance.logger(); // Cached for better performance.
- _traceLevels = instance.traceLevels(); // Cached for better performance.
+ _logger = instance.logger(); // Cached for better performance.
+ _traceLevels = instance.traceLevels(); // Cached for better performance.
_registeredWithPool = false;
- _warn = false;
- _acmTimeout = 0;
+ _warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
+ _acmTimeout = _endpoint.datagram() ? 0 : _instance.connectionIdleTime();
_acmAbsoluteTimeoutMillis = 0;
_nextRequestId = 1;
_batchStream = new BasicStream(instance);
_batchStreamInUse = false;
_batchRequestNum = 0;
_dispatchCount = 0;
- _proxyCount = 0;
_state = StateNotValidated;
_stateTime = System.currentTimeMillis();
@@ -1266,7 +1348,6 @@ public final class Connection extends EventHandler
assert(_state == StateClosed);
assert(_transceiver == null);
assert(_dispatchCount == 0);
- assert(_proxyCount == 0);
assert(_incomingCache == null);
_batchStream.destroy();
@@ -1400,17 +1481,24 @@ public final class Connection extends EventHandler
{
assert(!_registeredWithPool);
- try
- {
- _transceiver.close();
- }
- catch(Ice.LocalException ex)
+ //
+ // We must make sure that nobody is sending when
+ // we close the transceiver.
+ //
+ synchronized(_sendMutex)
{
- // Here we ignore any exceptions in close().
- }
+ try
+ {
+ _transceiver.close();
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Here we ignore any exceptions in close().
+ }
- _transceiver = null;
- _threadPool = null; // We don't need the thread pool anymore.
+ _transceiver = null;
+ //notifyAll(); // We notify already below.
+ }
}
else
{
@@ -1446,29 +1534,32 @@ public final class Connection extends EventHandler
if(!_endpoint.datagram())
{
- //
- // Before we shut down, we send a close connection
- // message.
- //
- BasicStream os = new BasicStream(_instance);
- os.writeByte(Protocol.magic[0]);
- os.writeByte(Protocol.magic[1]);
- os.writeByte(Protocol.magic[2]);
- os.writeByte(Protocol.magic[3]);
- os.writeByte(Protocol.protocolMajor);
- os.writeByte(Protocol.protocolMinor);
- os.writeByte(Protocol.encodingMajor);
- os.writeByte(Protocol.encodingMinor);
- os.writeByte(Protocol.closeConnectionMsg);
- os.writeByte((byte)0); // Compression status.
- os.writeInt(Protocol.headerSize); // Message size.
-
- //
- // Send the message.
- //
- TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
- _transceiver.shutdown();
+ synchronized(_sendMutex)
+ {
+ //
+ // Before we shut down, we send a close connection
+ // message.
+ //
+ BasicStream os = new BasicStream(_instance);
+ os.writeByte(Protocol.magic[0]);
+ os.writeByte(Protocol.magic[1]);
+ os.writeByte(Protocol.magic[2]);
+ os.writeByte(Protocol.magic[3]);
+ os.writeByte(Protocol.protocolMajor);
+ os.writeByte(Protocol.protocolMinor);
+ os.writeByte(Protocol.encodingMajor);
+ os.writeByte(Protocol.encodingMinor);
+ os.writeByte(Protocol.closeConnectionMsg);
+ os.writeByte((byte)0); // Compression status.
+ os.writeInt(Protocol.headerSize); // Message size.
+
+ //
+ // Send the message.
+ //
+ TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+ _transceiver.shutdown();
+ }
}
}
@@ -1477,7 +1568,6 @@ public final class Connection extends EventHandler
{
if(!_registeredWithPool)
{
- assert(_threadPool != null);
_threadPool._register(_transceiver.fd(), this);
_registeredWithPool = true;
@@ -1494,7 +1584,6 @@ public final class Connection extends EventHandler
{
if(_registeredWithPool)
{
- assert(_threadPool != null);
_threadPool.unregister(_transceiver.fd());
_registeredWithPool = false;
@@ -1523,7 +1612,7 @@ public final class Connection extends EventHandler
}
while(t != null);
pw.flush();
- String s = msg + ":\n" + sw.toString() + _transceiver.toString();
+ String s = msg + ":\n" + sw.toString() + _desc;
_logger.warning(s);
}
@@ -1581,18 +1670,8 @@ public final class Connection extends EventHandler
}
*/
- private boolean
- closingOK()
- {
- return
- _requests.isEmpty() &&
- _asyncRequests.isEmpty() &&
- !_batchStreamInUse &&
- _batchStream.isEmpty() &&
- _dispatchCount == 0;
- }
-
- private volatile Transceiver _transceiver; // Must be volatile, see comment in isFinished().
+ private Transceiver _transceiver;
+ private final String _desc;
private final Endpoint _endpoint;
private Ice.ObjectAdapter _adapter;
@@ -1602,11 +1681,11 @@ public final class Connection extends EventHandler
private final TraceLevels _traceLevels;
private boolean _registeredWithPool;
- private ThreadPool _threadPool;
+ private final ThreadPool _threadPool;
- private boolean _warn;
+ private final boolean _warn;
- private int _acmTimeout;
+ private final int _acmTimeout;
private long _acmAbsoluteTimeoutMillis;
private int _nextRequestId;
@@ -1619,12 +1698,16 @@ public final class Connection extends EventHandler
private boolean _batchStreamInUse;
private int _batchRequestNum;
- private volatile int _dispatchCount; // Must be volatile, see comment in isDestroyed().
+ private int _dispatchCount;
- private int _proxyCount;
+ private int _state; // The current state.
+ private long _stateTime; // The last time when the state was changed.
- private volatile int _state; // The current state. Must be volatile, see comment in isDestroyed().
- private long _stateTime; // The time when the state was changed the last time.
+ //
+ // We have a separate mutex for sending, so that we don't block
+ // the whole connection when we do a blocking send.
+ //
+ private java.lang.Object _sendMutex = new java.lang.Object();
private Incoming _incomingCache;
private java.lang.Object _incomingCacheMutex = new java.lang.Object();