summaryrefslogtreecommitdiff
path: root/java/src/Ice/ConnectionI.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r--java/src/Ice/ConnectionI.java612
1 files changed, 349 insertions, 263 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index c001462c960..fb2c0ce8a33 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -341,7 +341,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
sendRequest(IceInternal.Outgoing out, boolean compress, boolean response)
throws IceInternal.LocalExceptionWrapper
{
- int requestId = 0;
final IceInternal.BasicStream os = out.os();
if(_exception != null)
@@ -363,6 +362,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
_transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax());
+ int requestId = 0;
if(response)
{
//
@@ -382,7 +382,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
os.writeInt(requestId);
}
- out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
+ out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
os.size() - IceInternal.Protocol.headerSize - 4);
//
@@ -418,7 +418,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response)
throws IceInternal.LocalExceptionWrapper
{
- int requestId = 0;
final IceInternal.BasicStream os = out.__getOs();
if(_exception != null)
@@ -440,6 +439,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
_transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax());
+ int requestId = 0;
if(response)
{
//
@@ -459,13 +459,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
os.writeInt(requestId);
}
- out.__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
+ out.__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
os.size() - IceInternal.Protocol.headerSize - 4);
int status;
try
{
- status = sendMessage(new OutgoingMessage(out, out.__getOs(), compress, requestId));
+ status = sendMessage(new OutgoingMessage(out, os, compress, requestId));
}
catch(Ice.LocalException ex)
{
@@ -505,8 +505,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_exception != null)
{
//
- // If there were no batch requests queued when the connection failed, we can safely
- // retry with a new connection. Otherwise, we must throw to notify the caller that
+ // If there were no batch requests queued when the connection failed, we can safely
+ // retry with a new connection. Otherwise, we must throw to notify the caller that
// some previous batch requests were not sent.
//
if(_batchStream.isEmpty())
@@ -597,6 +597,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
buffer.b.get(lastRequest);
_batchStream.resize(_batchMarker, false);
+ //
+ // Send the batch stream without the last request.
+ //
try
{
//
@@ -673,7 +676,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public synchronized void
abortBatchRequest()
{
- _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
+ _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
_batchAutoFlush);
_batchRequestNum = 0;
_batchRequestCompress = false;
@@ -711,7 +714,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
return begin_flushBatchRequestsInternal(cb);
}
-
+
public AsyncResult
begin_flushBatchRequests(IceInternal.Functional_VoidCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.LocalException> __localExceptionCb,
@@ -793,6 +796,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_batchStream.swap(out.os());
+ //
+ // Send the batch stream.
+ //
boolean sent = false;
try
{
@@ -809,7 +815,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// Reset the batch stream.
//
- _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
+ _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
_batchAutoFlush);
_batchRequestNum = 0;
_batchRequestCompress = false;
@@ -857,6 +863,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_batchStream.swap(outAsync.__getOs());
+ //
+ // Send the batch stream.
+ //
int status;
try
{
@@ -873,7 +882,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// Reset the batch stream.
//
- _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
+ _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
_batchAutoFlush);
_batchRequestNum = 0;
_batchRequestCompress = false;
@@ -1103,7 +1112,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
return;
}
- assert(_state < StateClosing);
_adapter = adapter;
@@ -1166,36 +1174,49 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return;
}
+ int readyOp = current.operation;
try
{
unscheduleTimeout(current.operation);
- if((current.operation & IceInternal.SocketOperation.Write) != 0 && !_writeStream.isEmpty())
+
+ int writeOp = IceInternal.SocketOperation.None;
+ int readOp = IceInternal.SocketOperation.None;
+
+ if((readyOp & IceInternal.SocketOperation.Write) != 0)
{
+ final IceInternal.Buffer buf = _writeStream.getBuffer();
if(_observer != null)
{
- observerStartWrite(_writeStream.pos());
+ observerStartWrite(buf);
}
- if(!_transceiver.write(_writeStream.getBuffer()))
+ writeOp = _transceiver.write(buf);
+ if(_observer != null && (writeOp & IceInternal.SocketOperation.Write) == 0)
{
- assert(!_writeStream.isEmpty());
- scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout());
- return;
- }
- if(_observer != null)
- {
- observerFinishWrite(_writeStream.pos());
+ observerFinishWrite(buf);
}
- assert(!_writeStream.getBuffer().b.hasRemaining());
}
- if((current.operation & IceInternal.SocketOperation.Read) != 0 && !_readStream.isEmpty())
+
+ while((readyOp & IceInternal.SocketOperation.Read) != 0)
{
+ final IceInternal.Buffer buf = _readStream.getBuffer();
+ if(_observer != null && !_readHeader)
+ {
+ observerStartRead(buf);
+ }
+
+ readOp = _transceiver.read(buf, _hasMoreData);
+ if((readOp & IceInternal.SocketOperation.Read) != 0)
+ {
+ break;
+ }
+ if(_observer != null && !_readHeader)
+ {
+ assert(!buf.b.hasRemaining());
+ observerFinishRead(buf);
+ }
+
if(_readHeader) // Read header if necessary.
{
- if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData))
- {
- return;
- }
- assert(!_readStream.getBuffer().b.hasRemaining());
_readHeader = false;
if(_observer != null)
@@ -1256,34 +1277,33 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
throw new Ice.DatagramLimitException(); // The message was truncated.
}
- else
- {
- if(_observer != null)
- {
- observerStartRead(_readStream.pos());
- }
- if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData))
- {
- assert(!_readStream.isEmpty());
- scheduleTimeout(IceInternal.SocketOperation.Read, _endpoint.timeout());
- return;
- }
- if(_observer != null)
- {
- observerFinishRead(_readStream.pos());
- }
- assert(!_readStream.getBuffer().b.hasRemaining());
- }
+ continue;
}
+ break;
}
-
+
+ int newOp = readOp | writeOp;
+ readyOp = readyOp & ~newOp;
+ assert(readyOp != 0 || newOp != 0);
+
if(_state <= StateNotValidated)
{
+ if(newOp != 0)
+ {
+ //
+ // Wait for all the transceiver conditions to be
+ // satisfied before continuing.
+ //
+ scheduleTimeout(newOp);
+ _threadPool.update(this, current.operation, newOp);
+ return;
+ }
+
if(_state == StateNotInitialized && !initialize(current.operation))
{
return;
}
-
+
if(_state <= StateNotValidated && !validate(current.operation))
{
return;
@@ -1307,24 +1327,41 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
else
{
- assert(_state <= StateClosing);
-
+ assert(_state <= StateClosingPending);
+
//
// We parse messages first, if we receive a close
// connection message we won't send more messages.
- //
- if((current.operation & IceInternal.SocketOperation.Read) != 0)
+ //
+ if((readyOp & IceInternal.SocketOperation.Read) != 0)
{
- info = parseMessage(current.stream); // Optimization: use the thread's stream.
+ info = new MessageInfo(current.stream); // Optimization: use the thread's stream.
+ newOp |= parseMessage(info);
}
- if((current.operation & IceInternal.SocketOperation.Write) != 0)
+ if((readyOp & IceInternal.SocketOperation.Write) != 0)
{
- sentCBs = sendNextMessage();
- if(sentCBs != null)
+ sentCBs = new java.util.LinkedList<OutgoingMessage>();
+ newOp |= sendNextMessage(sentCBs);
+ if(!sentCBs.isEmpty())
{
++_dispatchCount;
}
+ else
+ {
+ sentCBs = null;
+ }
+ }
+
+ if(_state < StateClosed)
+ {
+ scheduleTimeout(newOp);
+ _threadPool.update(this, current.operation, newOp);
+ }
+
+ if(readyOp == 0)
+ {
+ return;
}
}
}
@@ -1368,9 +1405,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
_acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
}
+
current.ioCompleted();
}
-
+
if(_dispatcher != null)
{
if(info != null && info.heartbeatCallback == null) // No need for the stream if heartbeat callback
@@ -1452,22 +1490,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
++count;
}
- if(info.invokeNum > 0)
- {
- //
- // Method invocation (or multiple invocations for batch messages)
- // must be done outside the thread synchronization, so that nested
- // calls are possible.
- //
- invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
- info.adapter);
-
- //
- // Don't increase count, the dispatch count is
- // decreased when the incoming reply is sent.
- //
- }
-
if(info.heartbeatCallback != null)
{
try
@@ -1480,8 +1502,24 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
++count;
}
+
+ //
+ // Method invocation (or multiple invocations for batch messages)
+ // must be done outside the thread synchronization, so that nested
+ // calls are possible.
+ //
+ if(info.invokeNum > 0)
+ {
+ invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
+ info.adapter);
+
+ //
+ // Don't increase count, the dispatch count is
+ // decreased when the incoming reply is sent.
+ //
+ }
}
-
+
//
// Decrease dispatch count.
//
@@ -1493,13 +1531,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_dispatchCount == 0)
{
//
- // Only initiate shutdown if not already done. It
- // might have already been done if the sent
- // callback or AMI callback was dispatched when
- // the connection was already in the closing
- // state.
+ // Only initiate shutdown if not already done. It might
+ // have already been done if the sent callback or AMI
+ // callback was dispatched when the connection was already
+ // in the closing state.
//
- if(_state == StateClosing && !_shutdownInitiated)
+ if(_state == StateClosing)
{
try
{
@@ -1523,11 +1560,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public void
finished(IceInternal.ThreadPoolCurrent current)
{
+ synchronized(this)
+ {
+ assert(_state == StateClosed);
+ unscheduleTimeout(IceInternal.SocketOperation.Read | IceInternal.SocketOperation.Write);
+ }
+
//
- // Check if the connection needs to call user callbacks. If it doesn't, we
- // can safely run finish() from this "IO thread". Otherwise, we either run
- // finish() with the dispatcher if one is set, or we promote another IO
- // thread first before calling finish().
+ // If there are no callbacks to call, we don't call ioCompleted() since we're not going
+ // to call code that will potentially block (this avoids promoting a new leader and
+ // unecessary thread creation, especially if this is called on shutdown).
//
if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty() && _callback == null)
{
@@ -1551,7 +1593,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
finish();
}
- },
+ },
this);
}
catch(java.lang.Exception ex)
@@ -1567,43 +1609,30 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public void
finish()
{
- synchronized(this)
- {
- assert(_state == StateClosed);
- unscheduleTimeout(IceInternal.SocketOperation.Read |
- IceInternal.SocketOperation.Write |
- IceInternal.SocketOperation.Connect);
- }
-
if(_startCallback != null)
{
_startCallback.connectionStartFailed(this, _exception);
_startCallback = null;
}
-
+
if(!_sendStreams.isEmpty())
{
if(!_writeStream.isEmpty())
{
//
- // Return the stream to the outgoing call. This is important for
+ // Return the stream to the outgoing call. This is important for
// retriable AMI calls which are not marshalled again.
//
OutgoingMessage message = _sendStreams.getFirst();
_writeStream.swap(message.stream);
}
-
- //
- // NOTE: for twoway requests which are not sent, finished can be called twice: the
- // first time because the outgoing is in the _sendStreams set and the second time
- // because it's either in the _requests/_asyncRequests set. This is fine, only the
- // first call should be taken into account by the implementation of finished.
- //
+
for(OutgoingMessage p : _sendStreams)
{
- if(p.requestId > 0)
+ p.finished(_exception);
+ if(p.requestId > 0) // Make sure finished isn't called twice.
{
- if(p.out != null) // Make sure finished isn't called twice.
+ if(p.out != null)
{
_requests.remove(p.requestId);
}
@@ -1612,11 +1641,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_asyncRequests.remove(p.requestId);
}
}
- p.finished(_exception);
}
_sendStreams.clear();
}
-
+
for(IceInternal.Outgoing p : _requests.values())
{
p.finished(_exception, true);
@@ -1628,7 +1656,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
p.__finished(_exception, true);
}
_asyncRequests.clear();
-
+
if(_callback != null)
{
try
@@ -1662,18 +1690,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return _toString();
}
- public java.nio.channels.SelectableChannel
+ public java.nio.channels.SelectableChannel
fd()
{
return _transceiver.fd();
}
- public boolean
- hasMoreData()
- {
- return _hasMoreData.value;
- }
-
public synchronized void
timedOut()
{
@@ -1685,7 +1707,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
setState(StateClosed, new TimeoutException());
}
- else if(_state == StateClosing)
+ else if(_state < StateClosed)
{
setState(StateClosed, new CloseTimeoutException());
}
@@ -1760,7 +1782,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_monitor = monitor;
_transceiver = transceiver;
_desc = transceiver.toString();
- _type = transceiver.type();
+ _type = transceiver.protocol();
_connector = connector;
_endpoint = endpoint;
_adapter = adapter;
@@ -1786,7 +1808,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
_nextRequestId = 1;
_batchAutoFlush = initData.properties.getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0 ? true : false;
- _batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding,
+ _batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding,
_batchAutoFlush);
_batchStreamInUse = false;
_batchRequestNum = 0;
@@ -1869,8 +1891,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private static final int StateActive = 2;
private static final int StateHolding = 3;
private static final int StateClosing = 4;
- private static final int StateClosed = 5;
- private static final int StateFinished = 6;
+ private static final int StateClosingPending = 5;
+ private static final int StateClosed = 6;
+ private static final int StateFinished = 7;
private void
setState(int state, LocalException ex)
@@ -1888,6 +1911,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_exception == null)
{
+ //
+ // If we are in closed state, an exception must be set.
+ //
+ assert(_state != StateClosed);
+
_exception = ex;
//
@@ -1903,7 +1931,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_exception instanceof ConnectionTimeoutException ||
_exception instanceof CommunicatorDestroyedException ||
_exception instanceof ObjectAdapterDeactivatedException ||
- (_exception instanceof ConnectionLostException && _state == StateClosing)))
+ (_exception instanceof ConnectionLostException && _state >= StateClosing)))
{
warning("connection exception", _exception);
}
@@ -1995,19 +2023,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
case StateClosing:
+ case StateClosingPending:
{
//
- // Can't change back from closed.
+ // Can't change back from closing pending.
//
- if(_state >= StateClosed)
+ if(_state >= StateClosingPending)
{
return;
}
- if(_state == StateHolding)
- {
- // We need to continue to read in closing state.
- _threadPool.register(this, IceInternal.SocketOperation.Read);
- }
break;
}
@@ -2020,7 +2044,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_threadPool.finish(this);
break;
}
-
+
case StateFinished:
{
assert(_state == StateClosed);
@@ -2069,7 +2093,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(oldState != newState)
{
_observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(),
- _endpoint,
+ _endpoint,
newState,
_observer);
if(_observer != null)
@@ -2089,7 +2113,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_exception instanceof ConnectionTimeoutException ||
_exception instanceof CommunicatorDestroyedException ||
_exception instanceof ObjectAdapterDeactivatedException ||
- (_exception instanceof ConnectionLostException && _state == StateClosing)))
+ (_exception instanceof ConnectionLostException && _state >= StateClosing)))
{
_observer.failed(_exception.ice_name());
}
@@ -2117,17 +2141,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
assert(_state == StateClosing);
assert(_dispatchCount == 0);
- assert(!_shutdownInitiated);
+ if(_shutdownInitiated)
+ {
+ return;
+ }
_shutdownInitiated = true;
if(!_endpoint.datagram())
{
//
- // Before we shut down, we send a close connection
- // message.
+ // Before we shut down, we send a close connection message.
//
- IceInternal.BasicStream os = new IceInternal.BasicStream(_instance,
+ IceInternal.BasicStream os = new IceInternal.BasicStream(_instance,
IceInternal.Protocol.currentProtocolEncoding);
os.writeBlob(IceInternal.Protocol.magic);
IceInternal.Protocol.currentProtocol.__write(os);
@@ -2138,23 +2164,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if((sendMessage(new OutgoingMessage(os, false, false)) & IceInternal.AsyncStatus.Sent) > 0)
{
+ setState(StateClosingPending);
+
//
- // Schedule the close timeout to wait for the peer to close the connection. If
- // the message was queued for sending, sendNextMessage will schedule the timeout
- // once all messages were sent.
+ // Notify the the transceiver of the graceful connection closure.
//
- scheduleTimeout(IceInternal.SocketOperation.Write, closeTimeout());
+ int op = _transceiver.closing(true, _exception);
+ if(op != 0)
+ {
+ scheduleTimeout(op);
+ _threadPool.register(this, op);
+ }
}
-
- //
- // The CloseConnection message should be sufficient. Closing the write
- // end of the socket is probably an artifact of how things were done
- // in IIOP. In fact, shutting down the write end of the socket causes
- // problems on Windows by preventing the peer from using the socket.
- // For example, the peer is no longer able to continue writing a large
- // message after the socket is shutdown.
- //
- //_transceiver.shutdownWrite();
}
}
@@ -2190,10 +2211,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private boolean
initialize(int operation)
{
- int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer());
+ int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), _hasMoreData);
if(s != IceInternal.SocketOperation.None)
{
- scheduleTimeout(s, connectTimeout());
+ scheduleTimeout(s);
_threadPool.update(this, operation, s);
return false;
}
@@ -2203,6 +2224,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
_desc = _transceiver.toString();
setState(StateNotValidated);
+
return true;
}
@@ -2213,7 +2235,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_adapter != null) // The server side has the active role for connection validation.
{
- if(_writeStream.size() == 0)
+ if(_writeStream.isEmpty())
{
_writeStream.writeBlob(IceInternal.Protocol.magic);
IceInternal.Protocol.currentProtocol.__write(_writeStream);
@@ -2227,22 +2249,28 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_observer != null)
{
- observerStartWrite(_writeStream.pos());
+ observerStartWrite(_writeStream.getBuffer());
}
- if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer()))
+
+ if(_writeStream.pos() != _writeStream.size())
{
- scheduleTimeout(IceInternal.SocketOperation.Write, connectTimeout());
- _threadPool.update(this, operation, IceInternal.SocketOperation.Write);
- return false;
+ int op = _transceiver.write(_writeStream.getBuffer());
+ if(op != 0)
+ {
+ scheduleTimeout(op);
+ _threadPool.update(this, operation, op);
+ return false;
+ }
}
+
if(_observer != null)
{
- observerFinishWrite(_writeStream.pos());
+ observerFinishWrite(_writeStream.getBuffer());
}
}
else // The client side has the passive role for connection validation.
{
- if(_readStream.size() == 0)
+ if(_readStream.isEmpty())
{
_readStream.resize(IceInternal.Protocol.headerSize, true);
_readStream.pos(0);
@@ -2250,17 +2278,23 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_observer != null)
{
- observerStartRead(_readStream.pos());
+ observerStartRead(_readStream.getBuffer());
}
- if(_readStream.pos() != _readStream.size() && !_transceiver.read(_readStream.getBuffer(), _hasMoreData))
+
+ if(_readStream.pos() != _readStream.size())
{
- scheduleTimeout(IceInternal.SocketOperation.Read, connectTimeout());
- _threadPool.update(this, operation, IceInternal.SocketOperation.Read);
- return false;
+ int op = _transceiver.read(_readStream.getBuffer(), _hasMoreData);
+ if(op != 0)
+ {
+ scheduleTimeout(op);
+ _threadPool.update(this, operation, op);
+ return false;
+ }
}
+
if(_observer != null)
{
- observerFinishRead(_readStream.pos());
+ observerFinishRead(_readStream.getBuffer());
}
assert(_readStream.pos() == IceInternal.Protocol.headerSize);
@@ -2276,10 +2310,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_readProtocol.__read(_readStream);
IceInternal.Protocol.checkSupportedProtocol(_readProtocol);
-
+
_readProtocolEncoding.__read(_readStream);
IceInternal.Protocol.checkSupportedProtocolEncoding(_readProtocolEncoding);
-
+
byte messageType = _readStream.readByte();
if(messageType != IceInternal.Protocol.validateConnectionMsg)
{
@@ -2301,19 +2335,28 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_writeStream.pos(0);
_readStream.resize(IceInternal.Protocol.headerSize, true);
- _readHeader = true;
_readStream.pos(0);
-
+ _readHeader = true;
+
return true;
}
- private java.util.List<OutgoingMessage>
- sendNextMessage()
+ private int
+ sendNextMessage(java.util.List<OutgoingMessage> callbacks)
{
- assert(!_sendStreams.isEmpty());
- assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
+ if(_sendStreams.isEmpty())
+ {
+ return IceInternal.SocketOperation.None;
+ }
+ else if(_state == StateClosingPending && _writeStream.pos() == _writeStream.size())
+ {
+ // Message wasn't sent, empty the _writeStream, we're not going to send more data.
+ OutgoingMessage message = _sendStreams.getFirst();
+ _writeStream.swap(message.stream);
+ return IceInternal.SocketOperation.None;
+ }
- java.util.List<OutgoingMessage> callbacks = new java.util.LinkedList<OutgoingMessage>();
+ assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
try
{
while(true)
@@ -2338,17 +2381,17 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
//
- // If we are in the closed state, don't continue sending.
+ // If we are in the closed state or if the close is
+ // pending, don't continue sending.
+ //
+ // This can occur if parseMessage (called before
+ // sendNextMessage by message()) closes the connection.
//
- // The connection can be in the closed state if parseMessage
- // (called before sendNextMessage by message()) closes the
- // connection.
- //
- if(_state >= StateClosed)
+ if(_state >= StateClosingPending)
{
- return callbacks;
+ return IceInternal.SocketOperation.None;
}
-
+
//
// Otherwise, prepare the next message stream for writing.
//
@@ -2375,45 +2418,50 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
if(_observer != null)
{
- observerStartWrite(_writeStream.pos());
+ observerStartWrite(_writeStream.getBuffer());
}
- if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer()))
+ if(_writeStream.pos() != _writeStream.size())
{
- assert(!_writeStream.isEmpty());
- scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout());
- return callbacks;
+ int op = _transceiver.write(_writeStream.getBuffer());
+ if(op != 0)
+ {
+ return op;
+ }
}
if(_observer != null)
{
- observerFinishWrite(_writeStream.pos());
+ observerFinishWrite(_writeStream.getBuffer());
}
}
}
catch(Ice.LocalException ex)
{
setState(StateClosed, ex);
- return callbacks;
+ return IceInternal.SocketOperation.None;
}
- assert(_writeStream.isEmpty());
- _threadPool.unregister(this, IceInternal.SocketOperation.Write);
-
//
- // If all the messages were sent and we are in the closing state, we schedule
+ // If all the messages were sent and we are in the closing state, we schedule
// the close timeout to wait for the peer to close the connection.
//
if(_state == StateClosing)
{
- scheduleTimeout(IceInternal.SocketOperation.Write, closeTimeout());
+ setState(StateClosingPending);
+ int op = _transceiver.closing(true, _exception);
+ if(op != 0)
+ {
+ return op;
+ }
}
- return callbacks;
+ return IceInternal.SocketOperation.None;
}
private int
sendMessage(OutgoingMessage message)
{
assert(_state < StateClosed);
+
if(!_sendStreams.isEmpty())
{
message.adopt();
@@ -2423,8 +2471,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// Attempt to send the message without blocking. If the send blocks, we register
- // the connection with the selector thread or we request the caller to call
- // finishSendMessage() outside the synchronization.
+ // the connection with the selector thread.
//
assert(!message.prepared);
@@ -2434,6 +2481,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
message.stream = doCompress(stream, message.compress);
message.stream.prepareWrite();
message.prepared = true;
+ int op;
if(message.outAsync != null)
{
@@ -2444,16 +2492,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels);
}
+ //
+ // Send the message without blocking.
+ //
if(_observer != null)
{
- observerStartWrite(message.stream.pos());
+ observerStartWrite(message.stream.getBuffer());
}
- if(_transceiver.write(message.stream.getBuffer()))
+ op = _transceiver.write(message.stream.getBuffer());
+ if(op == 0)
{
if(_observer != null)
{
- observerFinishWrite(message.stream.pos());
+ observerFinishWrite(message.stream.getBuffer());
}
+
int status = IceInternal.AsyncStatus.Sent;
if(message.sent())
{
@@ -2466,12 +2519,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
return status;
}
+
message.adopt();
_writeStream.swap(message.stream);
_sendStreams.addLast(message);
- scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout());
- _threadPool.register(this, IceInternal.SocketOperation.Write);
+ scheduleTimeout(op);
+ _threadPool.register(this, op);
return IceInternal.AsyncStatus.Queued;
}
@@ -2549,18 +2603,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
ConnectionCallback heartbeatCallback;
}
- private MessageInfo
- parseMessage(IceInternal.BasicStream stream)
+ private int
+ parseMessage(MessageInfo info)
{
assert(_state > StateNotValidated && _state < StateClosed);
- MessageInfo info = new MessageInfo(stream);
-
_readStream.swap(info.stream);
_readStream.resize(IceInternal.Protocol.headerSize, true);
_readStream.pos(0);
_readHeader = true;
+ assert(info.stream.pos() == info.stream.size());
+
//
// Connection is validated on first message. This is only used by
// setState() to check wether or not we can print a connection
@@ -2569,15 +2623,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
_validated = true;
- assert(info.stream.pos() == info.stream.size());
-
try
{
//
// We don't need to check magic and version here. This has already
// been done by the ThreadPool which provides us with the stream.
//
- assert(info.stream.pos() == info.stream.size());
info.stream.pos(8);
byte messageType = info.stream.readByte();
info.compress = info.stream.readByte();
@@ -2611,14 +2662,24 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
else
{
- setState(StateClosed, new CloseConnectionException());
+ setState(StateClosingPending, new CloseConnectionException());
+
+ //
+ // Notify the the transceiver of the graceful connection closure.
+ //
+ int op = _transceiver.closing(false, _exception);
+ if(op != 0)
+ {
+ return op;
+ }
+ setState(StateClosed);
}
break;
}
case IceInternal.Protocol.requestMsg:
{
- if(_state == StateClosing)
+ if(_state >= StateClosing)
{
IceInternal.TraceUtil.trace("received request during closing\n" +
"(ignored by server, client will retry)",
@@ -2638,7 +2699,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
case IceInternal.Protocol.requestBatchMsg:
{
- if(_state == StateClosing)
+ if(_state >= StateClosing)
{
IceInternal.TraceUtil.trace("received batch request during closing\n" +
"(ignored by server, client will retry)",
@@ -2715,7 +2776,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- return info;
+ return _state == StateHolding ? IceInternal.SocketOperation.None : IceInternal.SocketOperation.Read;
}
private void
@@ -2787,23 +2848,72 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- private void
- scheduleTimeout(int status, int timeout)
+ private void
+ scheduleTimeout(int status)
{
+ int timeout;
+ if(_state < StateActive)
+ {
+ IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideConnectTimeout)
+ {
+ timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint.timeout();
+ }
+ }
+ else if(_state < StateClosingPending)
+ {
+ if(_readHeader) // No timeout for reading the header.
+ {
+ status &= ~IceInternal.SocketOperation.Read;
+ }
+ timeout = _endpoint.timeout();
+ }
+ else
+ {
+ IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCloseTimeout)
+ {
+ timeout = defaultsAndOverrides.overrideCloseTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint.timeout();
+ }
+ }
+
if(timeout < 0)
{
return;
}
- if((status & IceInternal.SocketOperation.Read) != 0)
+ try
{
- _timer.schedule(_readTimeout, timeout);
- _readTimeoutScheduled = true;
+ if((status & IceInternal.SocketOperation.Read) != 0)
+ {
+ if(_readTimeoutScheduled)
+ {
+ _timer.cancel(_readTimeout);
+ }
+ _timer.schedule(_readTimeout, timeout);
+ _readTimeoutScheduled = true;
+ }
+ if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0)
+ {
+ if(_writeTimeoutScheduled)
+ {
+ _timer.cancel(_writeTimeout);
+ }
+ _timer.schedule(_writeTimeout, timeout);
+ _writeTimeoutScheduled = true;
+ }
}
- if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0)
+ catch(Throwable ex)
{
- _timer.schedule(_writeTimeout, timeout);
- _writeTimeoutScheduled = true;
+ assert(false);
}
}
@@ -2823,34 +2933,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- private int
- connectTimeout()
- {
- IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideConnectTimeout)
- {
- return defaultsAndOverrides.overrideConnectTimeoutValue;
- }
- else
- {
- return _endpoint.timeout();
- }
- }
-
- private int
- closeTimeout()
- {
- IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideCloseTimeout)
- {
- return defaultsAndOverrides.overrideCloseTimeoutValue;
- }
- else
- {
- return _endpoint.timeout();
- }
- }
-
private ConnectionInfo
initConnectionInfo()
{
@@ -2875,7 +2957,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
return connectionStateMap[state];
}
-
+
private void
warning(String msg, Exception ex)
{
@@ -2888,46 +2970,50 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
private void
- observerStartRead(int pos)
+ observerStartRead(IceInternal.Buffer buf)
{
if(_readStreamPos >= 0)
{
- _observer.receivedBytes(pos - _readStreamPos);
+ assert(!buf.empty());
+ _observer.receivedBytes(buf.b.position() - _readStreamPos);
}
- _readStreamPos = pos;
+ _readStreamPos = buf.empty() ? -1 : buf.b.position();
}
private void
- observerFinishRead(int pos)
+ observerFinishRead(IceInternal.Buffer buf)
{
if(_readStreamPos == -1)
{
return;
}
- assert(pos >= _readStreamPos);
- _observer.receivedBytes(pos - _readStreamPos);
+ assert(buf.b.position() >= _readStreamPos);
+ _observer.receivedBytes(buf.b.position() - _readStreamPos);
_readStreamPos = -1;
}
private void
- observerStartWrite(int pos)
+ observerStartWrite(IceInternal.Buffer buf)
{
if(_writeStreamPos >= 0)
{
- _observer.sentBytes(pos - _writeStreamPos);
+ assert(!buf.empty());
+ _observer.sentBytes(buf.b.position() - _writeStreamPos);
}
- _writeStreamPos = pos;
+ _writeStreamPos = buf.empty() ? -1 : buf.b.position();
}
private void
- observerFinishWrite(int pos)
+ observerFinishWrite(IceInternal.Buffer buf)
{
if(_writeStreamPos == -1)
{
return;
}
- assert(pos >= _writeStreamPos);
- _observer.sentBytes(pos - _writeStreamPos);
+ if(buf.b.position() > _writeStreamPos)
+ {
+ _observer.sentBytes(buf.b.position() - _writeStreamPos);
+ }
_writeStreamPos = -1;
}
@@ -3160,7 +3246,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private boolean _readTimeoutScheduled;
private StartCallback _startCallback = null;
- private Ice.BooleanHolder _hasMoreData = new Ice.BooleanHolder(false);
private final boolean _warn;
private final boolean _warnUdp;
@@ -3222,6 +3307,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive
Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding
Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosingPending
Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed
Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished
};