diff options
author | Mark Spruiell <mes@zeroc.com> | 2014-05-29 11:06:44 -0700 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2014-05-29 11:06:44 -0700 |
commit | 3cfd324cdcc65d2acbc7536f1652d44f66a0e896 (patch) | |
tree | 44613394c5b9c6c6eb0ec8b41e110002a58d60ea /java/src/Ice/ConnectionI.java | |
parent | Fixed Python throughput demo config (diff) | |
download | ice-3cfd324cdcc65d2acbc7536f1652d44f66a0e896.tar.bz2 ice-3cfd324cdcc65d2acbc7536f1652d44f66a0e896.tar.xz ice-3cfd324cdcc65d2acbc7536f1652d44f66a0e896.zip |
porting C++ transport changes to Java
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 612 |
1 files changed, 349 insertions, 263 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index c001462c960..fb2c0ce8a33 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -341,7 +341,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne sendRequest(IceInternal.Outgoing out, boolean compress, boolean response) throws IceInternal.LocalExceptionWrapper { - int requestId = 0; final IceInternal.BasicStream os = out.os(); if(_exception != null) @@ -363,6 +362,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // _transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax()); + int requestId = 0; if(response) { // @@ -382,7 +382,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeInt(requestId); } - out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, + out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os.size() - IceInternal.Protocol.headerSize - 4); // @@ -418,7 +418,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response) throws IceInternal.LocalExceptionWrapper { - int requestId = 0; final IceInternal.BasicStream os = out.__getOs(); if(_exception != null) @@ -440,6 +439,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // _transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax()); + int requestId = 0; if(response) { // @@ -459,13 +459,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeInt(requestId); } - out.__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, + out.__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os.size() - IceInternal.Protocol.headerSize - 4); int status; try { - status = sendMessage(new OutgoingMessage(out, out.__getOs(), compress, requestId)); + status = sendMessage(new OutgoingMessage(out, os, compress, requestId)); } catch(Ice.LocalException ex) { @@ -505,8 +505,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_exception != null) { // - // If there were no batch requests queued when the connection failed, we can safely - // retry with a new connection. Otherwise, we must throw to notify the caller that + // If there were no batch requests queued when the connection failed, we can safely + // retry with a new connection. Otherwise, we must throw to notify the caller that // some previous batch requests were not sent. // if(_batchStream.isEmpty()) @@ -597,6 +597,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne buffer.b.get(lastRequest); _batchStream.resize(_batchMarker, false); + // + // Send the batch stream without the last request. + // try { // @@ -673,7 +676,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public synchronized void abortBatchRequest() { - _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, + _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, _batchAutoFlush); _batchRequestNum = 0; _batchRequestCompress = false; @@ -711,7 +714,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { return begin_flushBatchRequestsInternal(cb); } - + public AsyncResult begin_flushBatchRequests(IceInternal.Functional_VoidCallback __responseCb, IceInternal.Functional_GenericCallback1<Ice.LocalException> __localExceptionCb, @@ -793,6 +796,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchStream.swap(out.os()); + // + // Send the batch stream. + // boolean sent = false; try { @@ -809,7 +815,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // Reset the batch stream. // - _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, + _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, _batchAutoFlush); _batchRequestNum = 0; _batchRequestCompress = false; @@ -857,6 +863,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchStream.swap(outAsync.__getOs()); + // + // Send the batch stream. + // int status; try { @@ -873,7 +882,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // Reset the batch stream. // - _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, + _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, _batchAutoFlush); _batchRequestNum = 0; _batchRequestCompress = false; @@ -1103,7 +1112,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { return; } - assert(_state < StateClosing); _adapter = adapter; @@ -1166,36 +1174,49 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return; } + int readyOp = current.operation; try { unscheduleTimeout(current.operation); - if((current.operation & IceInternal.SocketOperation.Write) != 0 && !_writeStream.isEmpty()) + + int writeOp = IceInternal.SocketOperation.None; + int readOp = IceInternal.SocketOperation.None; + + if((readyOp & IceInternal.SocketOperation.Write) != 0) { + final IceInternal.Buffer buf = _writeStream.getBuffer(); if(_observer != null) { - observerStartWrite(_writeStream.pos()); + observerStartWrite(buf); } - if(!_transceiver.write(_writeStream.getBuffer())) + writeOp = _transceiver.write(buf); + if(_observer != null && (writeOp & IceInternal.SocketOperation.Write) == 0) { - assert(!_writeStream.isEmpty()); - scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); - return; - } - if(_observer != null) - { - observerFinishWrite(_writeStream.pos()); + observerFinishWrite(buf); } - assert(!_writeStream.getBuffer().b.hasRemaining()); } - if((current.operation & IceInternal.SocketOperation.Read) != 0 && !_readStream.isEmpty()) + + while((readyOp & IceInternal.SocketOperation.Read) != 0) { + final IceInternal.Buffer buf = _readStream.getBuffer(); + if(_observer != null && !_readHeader) + { + observerStartRead(buf); + } + + readOp = _transceiver.read(buf, _hasMoreData); + if((readOp & IceInternal.SocketOperation.Read) != 0) + { + break; + } + if(_observer != null && !_readHeader) + { + assert(!buf.b.hasRemaining()); + observerFinishRead(buf); + } + if(_readHeader) // Read header if necessary. { - if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData)) - { - return; - } - assert(!_readStream.getBuffer().b.hasRemaining()); _readHeader = false; if(_observer != null) @@ -1256,34 +1277,33 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { throw new Ice.DatagramLimitException(); // The message was truncated. } - else - { - if(_observer != null) - { - observerStartRead(_readStream.pos()); - } - if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData)) - { - assert(!_readStream.isEmpty()); - scheduleTimeout(IceInternal.SocketOperation.Read, _endpoint.timeout()); - return; - } - if(_observer != null) - { - observerFinishRead(_readStream.pos()); - } - assert(!_readStream.getBuffer().b.hasRemaining()); - } + continue; } + break; } - + + int newOp = readOp | writeOp; + readyOp = readyOp & ~newOp; + assert(readyOp != 0 || newOp != 0); + if(_state <= StateNotValidated) { + if(newOp != 0) + { + // + // Wait for all the transceiver conditions to be + // satisfied before continuing. + // + scheduleTimeout(newOp); + _threadPool.update(this, current.operation, newOp); + return; + } + if(_state == StateNotInitialized && !initialize(current.operation)) { return; } - + if(_state <= StateNotValidated && !validate(current.operation)) { return; @@ -1307,24 +1327,41 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else { - assert(_state <= StateClosing); - + assert(_state <= StateClosingPending); + // // We parse messages first, if we receive a close // connection message we won't send more messages. - // - if((current.operation & IceInternal.SocketOperation.Read) != 0) + // + if((readyOp & IceInternal.SocketOperation.Read) != 0) { - info = parseMessage(current.stream); // Optimization: use the thread's stream. + info = new MessageInfo(current.stream); // Optimization: use the thread's stream. + newOp |= parseMessage(info); } - if((current.operation & IceInternal.SocketOperation.Write) != 0) + if((readyOp & IceInternal.SocketOperation.Write) != 0) { - sentCBs = sendNextMessage(); - if(sentCBs != null) + sentCBs = new java.util.LinkedList<OutgoingMessage>(); + newOp |= sendNextMessage(sentCBs); + if(!sentCBs.isEmpty()) { ++_dispatchCount; } + else + { + sentCBs = null; + } + } + + if(_state < StateClosed) + { + scheduleTimeout(newOp); + _threadPool.update(this, current.operation, newOp); + } + + if(readyOp == 0) + { + return; } } } @@ -1368,9 +1405,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); } + current.ioCompleted(); } - + if(_dispatcher != null) { if(info != null && info.heartbeatCallback == null) // No need for the stream if heartbeat callback @@ -1452,22 +1490,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne ++count; } - if(info.invokeNum > 0) - { - // - // Method invocation (or multiple invocations for batch messages) - // must be done outside the thread synchronization, so that nested - // calls are possible. - // - invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, - info.adapter); - - // - // Don't increase count, the dispatch count is - // decreased when the incoming reply is sent. - // - } - if(info.heartbeatCallback != null) { try @@ -1480,8 +1502,24 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } ++count; } + + // + // Method invocation (or multiple invocations for batch messages) + // must be done outside the thread synchronization, so that nested + // calls are possible. + // + if(info.invokeNum > 0) + { + invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, + info.adapter); + + // + // Don't increase count, the dispatch count is + // decreased when the incoming reply is sent. + // + } } - + // // Decrease dispatch count. // @@ -1493,13 +1531,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_dispatchCount == 0) { // - // Only initiate shutdown if not already done. It - // might have already been done if the sent - // callback or AMI callback was dispatched when - // the connection was already in the closing - // state. + // Only initiate shutdown if not already done. It might + // have already been done if the sent callback or AMI + // callback was dispatched when the connection was already + // in the closing state. // - if(_state == StateClosing && !_shutdownInitiated) + if(_state == StateClosing) { try { @@ -1523,11 +1560,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void finished(IceInternal.ThreadPoolCurrent current) { + synchronized(this) + { + assert(_state == StateClosed); + unscheduleTimeout(IceInternal.SocketOperation.Read | IceInternal.SocketOperation.Write); + } + // - // Check if the connection needs to call user callbacks. If it doesn't, we - // can safely run finish() from this "IO thread". Otherwise, we either run - // finish() with the dispatcher if one is set, or we promote another IO - // thread first before calling finish(). + // If there are no callbacks to call, we don't call ioCompleted() since we're not going + // to call code that will potentially block (this avoids promoting a new leader and + // unecessary thread creation, especially if this is called on shutdown). // if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty() && _callback == null) { @@ -1551,7 +1593,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { finish(); } - }, + }, this); } catch(java.lang.Exception ex) @@ -1567,43 +1609,30 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void finish() { - synchronized(this) - { - assert(_state == StateClosed); - unscheduleTimeout(IceInternal.SocketOperation.Read | - IceInternal.SocketOperation.Write | - IceInternal.SocketOperation.Connect); - } - if(_startCallback != null) { _startCallback.connectionStartFailed(this, _exception); _startCallback = null; } - + if(!_sendStreams.isEmpty()) { if(!_writeStream.isEmpty()) { // - // Return the stream to the outgoing call. This is important for + // Return the stream to the outgoing call. This is important for // retriable AMI calls which are not marshalled again. // OutgoingMessage message = _sendStreams.getFirst(); _writeStream.swap(message.stream); } - - // - // NOTE: for twoway requests which are not sent, finished can be called twice: the - // first time because the outgoing is in the _sendStreams set and the second time - // because it's either in the _requests/_asyncRequests set. This is fine, only the - // first call should be taken into account by the implementation of finished. - // + for(OutgoingMessage p : _sendStreams) { - if(p.requestId > 0) + p.finished(_exception); + if(p.requestId > 0) // Make sure finished isn't called twice. { - if(p.out != null) // Make sure finished isn't called twice. + if(p.out != null) { _requests.remove(p.requestId); } @@ -1612,11 +1641,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _asyncRequests.remove(p.requestId); } } - p.finished(_exception); } _sendStreams.clear(); } - + for(IceInternal.Outgoing p : _requests.values()) { p.finished(_exception, true); @@ -1628,7 +1656,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne p.__finished(_exception, true); } _asyncRequests.clear(); - + if(_callback != null) { try @@ -1662,18 +1690,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return _toString(); } - public java.nio.channels.SelectableChannel + public java.nio.channels.SelectableChannel fd() { return _transceiver.fd(); } - public boolean - hasMoreData() - { - return _hasMoreData.value; - } - public synchronized void timedOut() { @@ -1685,7 +1707,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { setState(StateClosed, new TimeoutException()); } - else if(_state == StateClosing) + else if(_state < StateClosed) { setState(StateClosed, new CloseTimeoutException()); } @@ -1760,7 +1782,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _monitor = monitor; _transceiver = transceiver; _desc = transceiver.toString(); - _type = transceiver.type(); + _type = transceiver.protocol(); _connector = connector; _endpoint = endpoint; _adapter = adapter; @@ -1786,7 +1808,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } _nextRequestId = 1; _batchAutoFlush = initData.properties.getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0 ? true : false; - _batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding, + _batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding, _batchAutoFlush); _batchStreamInUse = false; _batchRequestNum = 0; @@ -1869,8 +1891,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private static final int StateActive = 2; private static final int StateHolding = 3; private static final int StateClosing = 4; - private static final int StateClosed = 5; - private static final int StateFinished = 6; + private static final int StateClosingPending = 5; + private static final int StateClosed = 6; + private static final int StateFinished = 7; private void setState(int state, LocalException ex) @@ -1888,6 +1911,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_exception == null) { + // + // If we are in closed state, an exception must be set. + // + assert(_state != StateClosed); + _exception = ex; // @@ -1903,7 +1931,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || _exception instanceof ObjectAdapterDeactivatedException || - (_exception instanceof ConnectionLostException && _state == StateClosing))) + (_exception instanceof ConnectionLostException && _state >= StateClosing))) { warning("connection exception", _exception); } @@ -1995,19 +2023,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } case StateClosing: + case StateClosingPending: { // - // Can't change back from closed. + // Can't change back from closing pending. // - if(_state >= StateClosed) + if(_state >= StateClosingPending) { return; } - if(_state == StateHolding) - { - // We need to continue to read in closing state. - _threadPool.register(this, IceInternal.SocketOperation.Read); - } break; } @@ -2020,7 +2044,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _threadPool.finish(this); break; } - + case StateFinished: { assert(_state == StateClosed); @@ -2069,7 +2093,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(oldState != newState) { _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), - _endpoint, + _endpoint, newState, _observer); if(_observer != null) @@ -2089,7 +2113,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || _exception instanceof ObjectAdapterDeactivatedException || - (_exception instanceof ConnectionLostException && _state == StateClosing))) + (_exception instanceof ConnectionLostException && _state >= StateClosing))) { _observer.failed(_exception.ice_name()); } @@ -2117,17 +2141,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { assert(_state == StateClosing); assert(_dispatchCount == 0); - assert(!_shutdownInitiated); + if(_shutdownInitiated) + { + return; + } _shutdownInitiated = true; if(!_endpoint.datagram()) { // - // Before we shut down, we send a close connection - // message. + // Before we shut down, we send a close connection message. // - IceInternal.BasicStream os = new IceInternal.BasicStream(_instance, + IceInternal.BasicStream os = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); os.writeBlob(IceInternal.Protocol.magic); IceInternal.Protocol.currentProtocol.__write(os); @@ -2138,23 +2164,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if((sendMessage(new OutgoingMessage(os, false, false)) & IceInternal.AsyncStatus.Sent) > 0) { + setState(StateClosingPending); + // - // Schedule the close timeout to wait for the peer to close the connection. If - // the message was queued for sending, sendNextMessage will schedule the timeout - // once all messages were sent. + // Notify the the transceiver of the graceful connection closure. // - scheduleTimeout(IceInternal.SocketOperation.Write, closeTimeout()); + int op = _transceiver.closing(true, _exception); + if(op != 0) + { + scheduleTimeout(op); + _threadPool.register(this, op); + } } - - // - // The CloseConnection message should be sufficient. Closing the write - // end of the socket is probably an artifact of how things were done - // in IIOP. In fact, shutting down the write end of the socket causes - // problems on Windows by preventing the peer from using the socket. - // For example, the peer is no longer able to continue writing a large - // message after the socket is shutdown. - // - //_transceiver.shutdownWrite(); } } @@ -2190,10 +2211,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private boolean initialize(int operation) { - int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer()); + int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), _hasMoreData); if(s != IceInternal.SocketOperation.None) { - scheduleTimeout(s, connectTimeout()); + scheduleTimeout(s); _threadPool.update(this, operation, s); return false; } @@ -2203,6 +2224,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // _desc = _transceiver.toString(); setState(StateNotValidated); + return true; } @@ -2213,7 +2235,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_adapter != null) // The server side has the active role for connection validation. { - if(_writeStream.size() == 0) + if(_writeStream.isEmpty()) { _writeStream.writeBlob(IceInternal.Protocol.magic); IceInternal.Protocol.currentProtocol.__write(_writeStream); @@ -2227,22 +2249,28 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_observer != null) { - observerStartWrite(_writeStream.pos()); + observerStartWrite(_writeStream.getBuffer()); } - if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer())) + + if(_writeStream.pos() != _writeStream.size()) { - scheduleTimeout(IceInternal.SocketOperation.Write, connectTimeout()); - _threadPool.update(this, operation, IceInternal.SocketOperation.Write); - return false; + int op = _transceiver.write(_writeStream.getBuffer()); + if(op != 0) + { + scheduleTimeout(op); + _threadPool.update(this, operation, op); + return false; + } } + if(_observer != null) { - observerFinishWrite(_writeStream.pos()); + observerFinishWrite(_writeStream.getBuffer()); } } else // The client side has the passive role for connection validation. { - if(_readStream.size() == 0) + if(_readStream.isEmpty()) { _readStream.resize(IceInternal.Protocol.headerSize, true); _readStream.pos(0); @@ -2250,17 +2278,23 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_observer != null) { - observerStartRead(_readStream.pos()); + observerStartRead(_readStream.getBuffer()); } - if(_readStream.pos() != _readStream.size() && !_transceiver.read(_readStream.getBuffer(), _hasMoreData)) + + if(_readStream.pos() != _readStream.size()) { - scheduleTimeout(IceInternal.SocketOperation.Read, connectTimeout()); - _threadPool.update(this, operation, IceInternal.SocketOperation.Read); - return false; + int op = _transceiver.read(_readStream.getBuffer(), _hasMoreData); + if(op != 0) + { + scheduleTimeout(op); + _threadPool.update(this, operation, op); + return false; + } } + if(_observer != null) { - observerFinishRead(_readStream.pos()); + observerFinishRead(_readStream.getBuffer()); } assert(_readStream.pos() == IceInternal.Protocol.headerSize); @@ -2276,10 +2310,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _readProtocol.__read(_readStream); IceInternal.Protocol.checkSupportedProtocol(_readProtocol); - + _readProtocolEncoding.__read(_readStream); IceInternal.Protocol.checkSupportedProtocolEncoding(_readProtocolEncoding); - + byte messageType = _readStream.readByte(); if(messageType != IceInternal.Protocol.validateConnectionMsg) { @@ -2301,19 +2335,28 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _writeStream.pos(0); _readStream.resize(IceInternal.Protocol.headerSize, true); - _readHeader = true; _readStream.pos(0); - + _readHeader = true; + return true; } - private java.util.List<OutgoingMessage> - sendNextMessage() + private int + sendNextMessage(java.util.List<OutgoingMessage> callbacks) { - assert(!_sendStreams.isEmpty()); - assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size()); + if(_sendStreams.isEmpty()) + { + return IceInternal.SocketOperation.None; + } + else if(_state == StateClosingPending && _writeStream.pos() == _writeStream.size()) + { + // Message wasn't sent, empty the _writeStream, we're not going to send more data. + OutgoingMessage message = _sendStreams.getFirst(); + _writeStream.swap(message.stream); + return IceInternal.SocketOperation.None; + } - java.util.List<OutgoingMessage> callbacks = new java.util.LinkedList<OutgoingMessage>(); + assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size()); try { while(true) @@ -2338,17 +2381,17 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // If we are in the closed state, don't continue sending. + // If we are in the closed state or if the close is + // pending, don't continue sending. + // + // This can occur if parseMessage (called before + // sendNextMessage by message()) closes the connection. // - // The connection can be in the closed state if parseMessage - // (called before sendNextMessage by message()) closes the - // connection. - // - if(_state >= StateClosed) + if(_state >= StateClosingPending) { - return callbacks; + return IceInternal.SocketOperation.None; } - + // // Otherwise, prepare the next message stream for writing. // @@ -2375,45 +2418,50 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // if(_observer != null) { - observerStartWrite(_writeStream.pos()); + observerStartWrite(_writeStream.getBuffer()); } - if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer())) + if(_writeStream.pos() != _writeStream.size()) { - assert(!_writeStream.isEmpty()); - scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); - return callbacks; + int op = _transceiver.write(_writeStream.getBuffer()); + if(op != 0) + { + return op; + } } if(_observer != null) { - observerFinishWrite(_writeStream.pos()); + observerFinishWrite(_writeStream.getBuffer()); } } } catch(Ice.LocalException ex) { setState(StateClosed, ex); - return callbacks; + return IceInternal.SocketOperation.None; } - assert(_writeStream.isEmpty()); - _threadPool.unregister(this, IceInternal.SocketOperation.Write); - // - // If all the messages were sent and we are in the closing state, we schedule + // If all the messages were sent and we are in the closing state, we schedule // the close timeout to wait for the peer to close the connection. // if(_state == StateClosing) { - scheduleTimeout(IceInternal.SocketOperation.Write, closeTimeout()); + setState(StateClosingPending); + int op = _transceiver.closing(true, _exception); + if(op != 0) + { + return op; + } } - return callbacks; + return IceInternal.SocketOperation.None; } private int sendMessage(OutgoingMessage message) { assert(_state < StateClosed); + if(!_sendStreams.isEmpty()) { message.adopt(); @@ -2423,8 +2471,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // Attempt to send the message without blocking. If the send blocks, we register - // the connection with the selector thread or we request the caller to call - // finishSendMessage() outside the synchronization. + // the connection with the selector thread. // assert(!message.prepared); @@ -2434,6 +2481,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne message.stream = doCompress(stream, message.compress); message.stream.prepareWrite(); message.prepared = true; + int op; if(message.outAsync != null) { @@ -2444,16 +2492,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels); } + // + // Send the message without blocking. + // if(_observer != null) { - observerStartWrite(message.stream.pos()); + observerStartWrite(message.stream.getBuffer()); } - if(_transceiver.write(message.stream.getBuffer())) + op = _transceiver.write(message.stream.getBuffer()); + if(op == 0) { if(_observer != null) { - observerFinishWrite(message.stream.pos()); + observerFinishWrite(message.stream.getBuffer()); } + int status = IceInternal.AsyncStatus.Sent; if(message.sent()) { @@ -2466,12 +2519,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } return status; } + message.adopt(); _writeStream.swap(message.stream); _sendStreams.addLast(message); - scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); - _threadPool.register(this, IceInternal.SocketOperation.Write); + scheduleTimeout(op); + _threadPool.register(this, op); return IceInternal.AsyncStatus.Queued; } @@ -2549,18 +2603,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne ConnectionCallback heartbeatCallback; } - private MessageInfo - parseMessage(IceInternal.BasicStream stream) + private int + parseMessage(MessageInfo info) { assert(_state > StateNotValidated && _state < StateClosed); - MessageInfo info = new MessageInfo(stream); - _readStream.swap(info.stream); _readStream.resize(IceInternal.Protocol.headerSize, true); _readStream.pos(0); _readHeader = true; + assert(info.stream.pos() == info.stream.size()); + // // Connection is validated on first message. This is only used by // setState() to check wether or not we can print a connection @@ -2569,15 +2623,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // _validated = true; - assert(info.stream.pos() == info.stream.size()); - try { // // We don't need to check magic and version here. This has already // been done by the ThreadPool which provides us with the stream. // - assert(info.stream.pos() == info.stream.size()); info.stream.pos(8); byte messageType = info.stream.readByte(); info.compress = info.stream.readByte(); @@ -2611,14 +2662,24 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else { - setState(StateClosed, new CloseConnectionException()); + setState(StateClosingPending, new CloseConnectionException()); + + // + // Notify the the transceiver of the graceful connection closure. + // + int op = _transceiver.closing(false, _exception); + if(op != 0) + { + return op; + } + setState(StateClosed); } break; } case IceInternal.Protocol.requestMsg: { - if(_state == StateClosing) + if(_state >= StateClosing) { IceInternal.TraceUtil.trace("received request during closing\n" + "(ignored by server, client will retry)", @@ -2638,7 +2699,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne case IceInternal.Protocol.requestBatchMsg: { - if(_state == StateClosing) + if(_state >= StateClosing) { IceInternal.TraceUtil.trace("received batch request during closing\n" + "(ignored by server, client will retry)", @@ -2715,7 +2776,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - return info; + return _state == StateHolding ? IceInternal.SocketOperation.None : IceInternal.SocketOperation.Read; } private void @@ -2787,23 +2848,72 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private void - scheduleTimeout(int status, int timeout) + private void + scheduleTimeout(int status) { + int timeout; + if(_state < StateActive) + { + IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideConnectTimeout) + { + timeout = defaultsAndOverrides.overrideConnectTimeoutValue; + } + else + { + timeout = _endpoint.timeout(); + } + } + else if(_state < StateClosingPending) + { + if(_readHeader) // No timeout for reading the header. + { + status &= ~IceInternal.SocketOperation.Read; + } + timeout = _endpoint.timeout(); + } + else + { + IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideCloseTimeout) + { + timeout = defaultsAndOverrides.overrideCloseTimeoutValue; + } + else + { + timeout = _endpoint.timeout(); + } + } + if(timeout < 0) { return; } - if((status & IceInternal.SocketOperation.Read) != 0) + try { - _timer.schedule(_readTimeout, timeout); - _readTimeoutScheduled = true; + if((status & IceInternal.SocketOperation.Read) != 0) + { + if(_readTimeoutScheduled) + { + _timer.cancel(_readTimeout); + } + _timer.schedule(_readTimeout, timeout); + _readTimeoutScheduled = true; + } + if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0) + { + if(_writeTimeoutScheduled) + { + _timer.cancel(_writeTimeout); + } + _timer.schedule(_writeTimeout, timeout); + _writeTimeoutScheduled = true; + } } - if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0) + catch(Throwable ex) { - _timer.schedule(_writeTimeout, timeout); - _writeTimeoutScheduled = true; + assert(false); } } @@ -2823,34 +2933,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private int - connectTimeout() - { - IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideConnectTimeout) - { - return defaultsAndOverrides.overrideConnectTimeoutValue; - } - else - { - return _endpoint.timeout(); - } - } - - private int - closeTimeout() - { - IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideCloseTimeout) - { - return defaultsAndOverrides.overrideCloseTimeoutValue; - } - else - { - return _endpoint.timeout(); - } - } - private ConnectionInfo initConnectionInfo() { @@ -2875,7 +2957,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { return connectionStateMap[state]; } - + private void warning(String msg, Exception ex) { @@ -2888,46 +2970,50 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } private void - observerStartRead(int pos) + observerStartRead(IceInternal.Buffer buf) { if(_readStreamPos >= 0) { - _observer.receivedBytes(pos - _readStreamPos); + assert(!buf.empty()); + _observer.receivedBytes(buf.b.position() - _readStreamPos); } - _readStreamPos = pos; + _readStreamPos = buf.empty() ? -1 : buf.b.position(); } private void - observerFinishRead(int pos) + observerFinishRead(IceInternal.Buffer buf) { if(_readStreamPos == -1) { return; } - assert(pos >= _readStreamPos); - _observer.receivedBytes(pos - _readStreamPos); + assert(buf.b.position() >= _readStreamPos); + _observer.receivedBytes(buf.b.position() - _readStreamPos); _readStreamPos = -1; } private void - observerStartWrite(int pos) + observerStartWrite(IceInternal.Buffer buf) { if(_writeStreamPos >= 0) { - _observer.sentBytes(pos - _writeStreamPos); + assert(!buf.empty()); + _observer.sentBytes(buf.b.position() - _writeStreamPos); } - _writeStreamPos = pos; + _writeStreamPos = buf.empty() ? -1 : buf.b.position(); } private void - observerFinishWrite(int pos) + observerFinishWrite(IceInternal.Buffer buf) { if(_writeStreamPos == -1) { return; } - assert(pos >= _writeStreamPos); - _observer.sentBytes(pos - _writeStreamPos); + if(buf.b.position() > _writeStreamPos) + { + _observer.sentBytes(buf.b.position() - _writeStreamPos); + } _writeStreamPos = -1; } @@ -3160,7 +3246,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private boolean _readTimeoutScheduled; private StartCallback _startCallback = null; - private Ice.BooleanHolder _hasMoreData = new Ice.BooleanHolder(false); private final boolean _warn; private final boolean _warnUdp; @@ -3222,6 +3307,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing + Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosingPending Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished }; |