diff options
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 1107 |
1 files changed, 414 insertions, 693 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index b304efa82b5..00645ce8dc9 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -14,30 +14,31 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public interface StartCallback { void connectionStartCompleted(ConnectionI connection); + void connectionStartFailed(ConnectionI connection, Ice.LocalException ex); } private class TimeoutCallback implements Runnable { @Override - public void - run() + public void run() { timedOut(); } } - public void - start(StartCallback callback) + public void start(StartCallback callback) { try { synchronized(this) { - if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed. + // The connection might already be closed if the communicator + // was destroyed. + if(_state >= StateClosed) { - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None)) @@ -62,18 +63,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne callback.connectionStartCompleted(this); } - public void - startAndWait() - throws InterruptedException + public void startAndWait() throws InterruptedException { try { synchronized(this) { - if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed. + // The connection might already be closed if the communicator + // was destroyed. + if(_state >= StateClosed) { - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None)) @@ -85,8 +86,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_state >= StateClosing) { - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } } @@ -104,8 +105,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - public synchronized void - activate() + public synchronized void activate() { if(_state <= StateNotValidated) { @@ -120,8 +120,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(StateActive); } - public synchronized void - hold() + public synchronized void hold() { if(_state <= StateNotValidated) { @@ -135,8 +134,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public final static int ObjectAdapterDeactivated = 0; public final static int CommunicatorDestroyed = 1; - synchronized public void - destroy(int reason) + synchronized public void destroy(int reason) { switch(reason) { @@ -155,8 +153,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - synchronized public void - close(boolean force) + synchronized public void close(boolean force) { if(force) { @@ -171,7 +168,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // requests to be retried, regardless of whether the // server has processed them or not. // - while(!_requests.isEmpty() || !_asyncRequests.isEmpty()) + while(!_asyncRequests.isEmpty()) { try { @@ -187,37 +184,32 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - public synchronized boolean - isActiveOrHolding() + public synchronized boolean isActiveOrHolding() { return _state > StateNotValidated && _state < StateClosing; } - public synchronized boolean - isFinished() + public synchronized boolean isFinished() { if(_state != StateFinished || _dispatchCount != 0) { return false; } - assert(_state == StateFinished); + assert (_state == StateFinished); return true; } - public synchronized void - throwException() + public synchronized void throwException() { if(_exception != null) { - assert(_state >= StateClosing); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_state >= StateClosing); + throw (Ice.LocalException) _exception.fillInStackTrace(); } } - public synchronized void - waitUntilHolding() - throws InterruptedException + public synchronized void waitUntilHolding() throws InterruptedException { while(_state < StateHolding || _dispatchCount > 0) { @@ -225,9 +217,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - public synchronized void - waitUntilFinished() - throws InterruptedException + public synchronized void waitUntilFinished() throws InterruptedException { // // We wait indefinitely until the connection is finished and all @@ -240,7 +230,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne wait(); } - assert(_state == StateFinished); + assert (_state == StateFinished); // // Clear the OA. See bug 1673 for the details of why this is necessary. @@ -248,19 +238,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _adapter = null; } - synchronized public void - updateObserver() + synchronized public void updateObserver() { if(_state < StateNotValidated || _state > StateClosed) { return; } - assert(_instance.getObserver() != null); - _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), - _endpoint, - toConnectionState(_state), - _observer); + assert (_instance.getObserver() != null); + _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), _endpoint, + toConnectionState(_state), _observer); if(_observer != null) { _observer.attach(); @@ -272,8 +259,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - synchronized public void - monitor(long now, IceInternal.ACMConfig acm) + synchronized public void monitor(long now, IceInternal.ACMConfig acm) { if(_state != StateActive) { @@ -285,7 +271,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // If writing or reading, nothing to do, the connection // timeout will kick-in if writes or reads don't progress. - // This check is necessary because the actitivy timer is + // This check is necessary because the activity timer is // only set when a message is fully read/written. // return; @@ -317,7 +303,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(acm.close != ACMClose.CloseOff && now >= (_acmLastActivity + acm.timeout)) { if(acm.close == ACMClose.CloseOnIdleForceful || - (acm.close != ACMClose.CloseOnIdle && (!_requests.isEmpty() || !_asyncRequests.isEmpty()))) + (acm.close != ACMClose.CloseOnIdle && (!_asyncRequests.isEmpty()))) { // // Close the connection if we didn't receive a heartbeat in @@ -325,8 +311,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // setState(StateClosed, new ConnectionTimeoutException()); } - else if(acm.close != ACMClose.CloseOnInvocation && - _dispatchCount == 0 && _batchStream.isEmpty() && _requests.isEmpty() && _asyncRequests.isEmpty()) + else if(acm.close != ACMClose.CloseOnInvocation && _dispatchCount == 0 && _batchStream.isEmpty() && + _asyncRequests.isEmpty()) { // // The connection is idle, close it. @@ -336,88 +322,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - synchronized public boolean - sendRequest(IceInternal.Outgoing out, boolean compress, boolean response) - throws IceInternal.RetryException - { - final IceInternal.BasicStream os = out.os(); - - if(_exception != null) - { - // - // If the connection is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - throw new IceInternal.RetryException((Ice.LocalException)_exception.fillInStackTrace()); - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Ensure the message isn't bigger than what we can send with the - // transport. - // - _transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax()); - - int requestId = 0; - if(response) - { - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - os.pos(IceInternal.Protocol.headerSize); - os.writeInt(requestId); - } - - out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, - os.size() - IceInternal.Protocol.headerSize - 4); - - // - // Send the message. If it can't be sent without blocking the message is added - // to _sendStreams and it will be sent by the selector thread or by this thread - // if flush is true. - // - boolean sent = false; - try - { - OutgoingMessage message = new OutgoingMessage(out, out.os(), compress, requestId); - sent = (sendMessage(message) & IceInternal.AsyncStatus.Sent) > 0; - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); - } - - if(response) - { - // - // Add to the requests map. - // - _requests.put(requestId, out); - } - - return sent; // The request was sent. - } - - synchronized public int - sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response) - throws IceInternal.RetryException + synchronized public int sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response) + throws IceInternal.RetryException { - final IceInternal.BasicStream os = out.__getOs(); + final IceInternal.BasicStream os = out.getOs(); if(_exception != null) { @@ -426,11 +334,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // to send our request, we always try to send the request // again. // - throw new IceInternal.RetryException((Ice.LocalException)_exception.fillInStackTrace()); + throw new IceInternal.RetryException((Ice.LocalException) _exception.fillInStackTrace()); } - assert(_state > StateNotValidated); - assert(_state < StateClosing); + assert (_state > StateNotValidated); + assert (_state < StateClosing); // // Ensure the message isn't bigger than what we can send with the @@ -458,8 +366,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeInt(requestId); } - out.__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, - os.size() - IceInternal.Protocol.headerSize - 4); + out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os.size() - + IceInternal.Protocol.headerSize - 4); int status; try @@ -469,8 +377,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne catch(Ice.LocalException ex) { setState(StateClosed, ex); - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } if(response) @@ -483,31 +391,30 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return status; } - public synchronized void - prepareBatchRequest(IceInternal.BasicStream os) - throws IceInternal.RetryException + public synchronized void prepareBatchRequest(IceInternal.BasicStream os) throws IceInternal.RetryException { waitBatchStreamInUse(); if(_exception != null) { // - // If there were no batch requests queued when the connection failed, we can safely - // retry with a new connection. Otherwise, we must throw to notify the caller that - // some previous batch requests were not sent. + // If there were no batch requests queued when the connection + // failed, we can safely retry with a new connection. Otherwise, we + // must throw to notify the caller that some previous batch requests + // were not sent. // if(_batchStream.isEmpty()) { - throw new IceInternal.RetryException((Ice.LocalException)_exception.fillInStackTrace()); + throw new IceInternal.RetryException((Ice.LocalException) _exception.fillInStackTrace()); } else { - throw (Ice.LocalException)_exception.fillInStackTrace(); + throw (Ice.LocalException) _exception.fillInStackTrace(); } } - assert(_state > StateNotValidated); - assert(_state < StateClosing); + assert (_state > StateNotValidated); + assert (_state < StateClosing); if(_batchStream.isEmpty()) { @@ -532,8 +439,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // } - public void - finishBatchRequest(IceInternal.BasicStream os, boolean compress) + public void finishBatchRequest(IceInternal.BasicStream os, boolean compress) { try { @@ -553,9 +459,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_batchAutoFlush) { // - // Throw memory limit exception if the first message added causes us to go over - // limit. Otherwise put aside the marshalled message that caused limit to be - // exceeded and rollback stream to the marker. + // Throw memory limit exception if the first message added + // causes us to go over limit. Otherwise put aside the + // marshalled message that caused limit to be exceeded and + // rollback stream to the marker. + // try { _transceiver.checkSendSize(_batchStream.getBuffer(), _instance.messageSizeMax()); @@ -601,31 +509,32 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne catch(Ice.LocalException ex) { setState(StateClosed, ex); - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } // // Reset the batch stream. // _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, - _batchAutoFlush); + _batchAutoFlush); _batchRequestNum = 0; _batchRequestCompress = false; _batchMarker = 0; // - // Check again if the last request doesn't exceed the maximum message size. + // Check again if the last request doesn't exceed the + // maximum message size. // - if(IceInternal.Protocol.requestBatchHdr.length + lastRequest.length > _instance.messageSizeMax()) + if(IceInternal.Protocol.requestBatchHdr.length + lastRequest.length > _instance.messageSizeMax()) { - IceInternal.Ex.throwMemoryLimitException( - IceInternal.Protocol.requestBatchHdr.length + lastRequest.length, - _instance.messageSizeMax()); + IceInternal.Ex.throwMemoryLimitException(IceInternal.Protocol.requestBatchHdr.length + + lastRequest.length, _instance.messageSizeMax()); } // - // Start a new batch with the last message that caused us to go over the limit. + // Start a new batch with the last message that caused us to + // go over the limit. // _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr); _batchStream.writeBlob(lastRequest); @@ -637,7 +546,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne ++_batchRequestNum; // - // We compress the whole batch if there is at least one compressed + // We compress the whole batch if there is at least one + // compressed // message. // if(compress) @@ -648,7 +558,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // Notify about the batch stream not being in use anymore. // - assert(_batchStreamInUse); + assert (_batchStreamInUse); _batchStreamInUse = false; notifyAll(); } @@ -660,173 +570,103 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - public synchronized void - abortBatchRequest() + public synchronized void abortBatchRequest() { _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, - _batchAutoFlush); + _batchAutoFlush); _batchRequestNum = 0; _batchRequestCompress = false; _batchMarker = 0; - assert(_batchStreamInUse); + assert (_batchStreamInUse); _batchStreamInUse = false; notifyAll(); } @Override - public void - flushBatchRequests() + public void flushBatchRequests() { - IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance, __flushBatchRequests_name); - try - { - out.invoke(); - } - catch(InterruptedException ex) - { - throw new Ice.OperationInterruptedException(); - } + end_flushBatchRequests(begin_flushBatchRequests()); } private static final String __flushBatchRequests_name = "flushBatchRequests"; @Override - public Ice.AsyncResult - begin_flushBatchRequests() + public Ice.AsyncResult begin_flushBatchRequests() { return begin_flushBatchRequestsInternal(null); } @Override - public Ice.AsyncResult - begin_flushBatchRequests(Callback cb) + public Ice.AsyncResult begin_flushBatchRequests(Callback cb) { return begin_flushBatchRequestsInternal(cb); } @Override - public Ice.AsyncResult - begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb) + public Ice.AsyncResult begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb) { return begin_flushBatchRequestsInternal(cb); } @Override - public AsyncResult - begin_flushBatchRequests(IceInternal.Functional_VoidCallback __responseCb, - IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, - IceInternal.Functional_BoolCallback __sentCb) + public AsyncResult begin_flushBatchRequests(IceInternal.Functional_VoidCallback __responseCb, + IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, + IceInternal.Functional_BoolCallback __sentCb) { - return begin_flushBatchRequestsInternal( - new IceInternal.Functional_CallbackBase(false, __exceptionCb, __sentCb) + return begin_flushBatchRequestsInternal(new IceInternal.Functional_CallbackBase(false, __exceptionCb, __sentCb) + { + @Override + public final void __completed(AsyncResult __result) + { + try { - @Override - public final void __completed(AsyncResult __result) - { - try - { - __result.getConnection().end_flushBatchRequests(__result); - } - catch(Exception __ex) - { - __exceptionCb.apply(__ex); - } - } - }); + __result.getConnection().end_flushBatchRequests(__result); + } + catch(Exception __ex) + { + __exceptionCb.apply(__ex); + } + } + }); } - private Ice.AsyncResult - begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) + private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) { - IceInternal.ConnectionBatchOutgoingAsync result = - new IceInternal.ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb); + IceInternal.ConnectionBatchOutgoingAsync result = new IceInternal.ConnectionBatchOutgoingAsync(this, + _communicator, _instance, __flushBatchRequests_name, cb); try { result.__invoke(); } catch(LocalException __ex) { - result.__invokeExceptionAsync(__ex); + result.invokeExceptionAsync(__ex); } return result; } @Override - public void - end_flushBatchRequests(AsyncResult r) + public void end_flushBatchRequests(AsyncResult ir) { - AsyncResult.__check(r, this, __flushBatchRequests_name); + IceInternal.AsyncResultI r = (IceInternal.AsyncResultI) ir; + IceInternal.AsyncResultI.check(r, this, __flushBatchRequests_name); r.__wait(); } - synchronized public boolean - flushBatchRequests(IceInternal.BatchOutgoing out) - { - waitBatchStreamInUse(); - if(_exception != null) - { - throw (Ice.LocalException)_exception.fillInStackTrace(); - } - - if(_batchRequestNum == 0) - { - out.sent(); - return true; - } - - // - // Fill in the number of requests in the batch. - // - _batchStream.pos(IceInternal.Protocol.headerSize); - _batchStream.writeInt(_batchRequestNum); - - out.attachRemoteObserver(initConnectionInfo(), _endpoint, - _batchStream.size() - IceInternal.Protocol.headerSize - 4); - - _batchStream.swap(out.os()); - - // - // Send the batch stream. - // - boolean sent = false; - try - { - OutgoingMessage message = new OutgoingMessage(out, out.os(), _batchRequestCompress, 0); - sent = (sendMessage(message) & IceInternal.AsyncStatus.Sent) > 0; - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); - } - - // - // Reset the batch stream. - // - _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, - _batchAutoFlush); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchMarker = 0; - return sent; - } - - synchronized public int - flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync) + synchronized public int flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync) { waitBatchStreamInUse(); if(_exception != null) { - throw (Ice.LocalException)_exception.fillInStackTrace(); + throw (Ice.LocalException) _exception.fillInStackTrace(); } if(_batchRequestNum == 0) { int status = IceInternal.AsyncStatus.Sent; - if(outAsync.__sent()) + if(outAsync.sent()) { status |= IceInternal.AsyncStatus.InvokeSentCallback; } @@ -839,10 +679,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchStream.pos(IceInternal.Protocol.headerSize); _batchStream.writeInt(_batchRequestNum); - outAsync.__attachRemoteObserver(initConnectionInfo(), _endpoint, 0, - _batchStream.size() - IceInternal.Protocol.headerSize - 4); + outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0, _batchStream.size() - + IceInternal.Protocol.headerSize - 4); - _batchStream.swap(outAsync.__getOs()); + _batchStream.swap(outAsync.getOs()); // // Send the batch stream. @@ -850,21 +690,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne int status; try { - OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.__getOs(), _batchRequestCompress, 0); + OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.getOs(), _batchRequestCompress, 0); status = sendMessage(message); } catch(Ice.LocalException ex) { setState(StateClosed, ex); - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } // // Reset the batch stream. // _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, - _batchAutoFlush); + _batchAutoFlush); _batchRequestNum = 0; _batchRequestCompress = false; _batchMarker = 0; @@ -872,8 +712,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - synchronized public void - setCallback(ConnectionCallback callback) + synchronized public void setCallback(ConnectionCallback callback) { if(_state > StateClosing) { @@ -883,8 +722,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - synchronized public void - setACM(Ice.IntOptional timeout, Ice.Optional<ACMClose> close, Ice.Optional<ACMHeartbeat> heartbeat) + synchronized public void setACM(Ice.IntOptional timeout, Ice.Optional<ACMClose> close, + Ice.Optional<ACMHeartbeat> heartbeat) { if(_monitor != null) { @@ -893,10 +732,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _monitor.remove(this); } _monitor = _monitor.acm(timeout, close, heartbeat); - + if(_monitor.getACM().timeout <= 0) { - _acmLastActivity = -1; // Disable the recording of last activity. + _acmLastActivity = -1; // Disable the recording of last + // activity. } else if(_state == StateActive && _acmLastActivity == -1) { @@ -912,59 +752,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - synchronized public Ice.ACM - getACM() + synchronized public Ice.ACM getACM() { return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff); } - synchronized public boolean - requestCanceled(IceInternal.OutgoingMessageCallback out, Ice.LocalException ex) - { - java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); - while(it.hasNext()) - { - OutgoingMessage o = it.next(); - if(o.out == out) - { - if(o.requestId > 0) - { - _requests.remove(o.requestId); - } - - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - o.timedOut(); - if(o != _sendStreams.getFirst()) - { - it.remove(); - } - out.finished(ex); - return true; // We're done. - } - } - - if(out instanceof IceInternal.Outgoing) - { - IceInternal.Outgoing o = (IceInternal.Outgoing)out; - java.util.Iterator<IceInternal.Outgoing> it2 = _requests.values().iterator(); - while(it2.hasNext()) - { - if(it2.next() == o) - { - o.finished(ex); - it2.remove(); - return true; // We're done. - } - } - } - return false; - } - - public boolean - asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex) + synchronized public boolean asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync, + Ice.LocalException ex) { java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); while(it.hasNext()) @@ -978,42 +772,44 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. + // If the request is being sent, don't remove it from the send + // streams, it will be removed once the sending is finished. + // + // Note that since we swapped the message stream to _writeStream + // it's fine if the OutgoingAsync output stream is released (and + // as long as canceled requests cannot be retried). // o.timedOut(); if(o != _sendStreams.getFirst()) { it.remove(); } - outAsync.__dispatchInvocationCancel(ex, _threadPool, this); + outAsync.dispatchInvocationCancel(ex, _threadPool, this); return true; // We're done } } if(outAsync instanceof IceInternal.OutgoingAsync) { - IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)outAsync; + IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync) outAsync; java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator(); while(it2.hasNext()) { if(it2.next() == o) { it2.remove(); - outAsync.__dispatchInvocationCancel(ex, _threadPool, this); + outAsync.dispatchInvocationCancel(ex, _threadPool, this); return true; // We're done. } } } - return false; } @Override - synchronized public void - sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag) + synchronized public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag) { - assert(_state > StateNotValidated); + assert (_state > StateNotValidated); try { @@ -1028,8 +824,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_state >= StateClosed) { - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } sendMessage(new OutgoingMessage(os, compressFlag != 0, true)); @@ -1046,10 +842,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - synchronized public void - sendNoResponse() + synchronized public void sendNoResponse() { - assert(_state > StateNotValidated); + assert (_state > StateNotValidated); try { if(--_dispatchCount == 0) @@ -1063,8 +858,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_state >= StateClosed) { - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } if(_state == StateClosing && _dispatchCount == 0) @@ -1079,27 +874,25 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - public boolean - systemException(int requestId, Ice.SystemException ex) + public boolean systemException(int requestId, Ice.SystemException ex) { return false; // System exceptions aren't marshalled. } - public IceInternal.EndpointI - endpoint() + public IceInternal.EndpointI endpoint() { - return _endpoint; // No mutex protection necessary, _endpoint is immutable. + return _endpoint; // No mutex protection necessary, _endpoint is + // immutable. } - public IceInternal.Connector - connector() + public IceInternal.Connector connector() { - return _connector; // No mutex protection necessary, _connector is immutable. + return _connector; // No mutex protection necessary, _connector is + // immutable. } @Override - public synchronized void - setAdapter(ObjectAdapter adapter) + public synchronized void setAdapter(ObjectAdapter adapter) { if(_state <= StateNotValidated || _state >= StateClosing) { @@ -1110,7 +903,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_adapter != null) { - _servantManager = ((ObjectAdapterI)_adapter).getServantManager(); + _servantManager = ((ObjectAdapterI) _adapter).getServantManager(); if(_servantManager == null) { _adapter = null; @@ -1129,22 +922,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - public synchronized ObjectAdapter - getAdapter() + public synchronized ObjectAdapter getAdapter() { return _adapter; } @Override - public Endpoint - getEndpoint() + public Endpoint getEndpoint() { - return _endpoint; // No mutex protection necessary, _endpoint is immutable. + return _endpoint; // No mutex protection necessary, _endpoint is + // immutable. } @Override - public ObjectPrx - createProxy(Identity ident) + public ObjectPrx createProxy(Identity ident) { // // Create a reference and return a reverse proxy for this @@ -1157,8 +948,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Operations from EventHandler // @Override - public void - message(IceInternal.ThreadPoolCurrent current) + public void message(IceInternal.ThreadPoolCurrent current) { StartCallback startCB = null; java.util.List<OutgoingMessage> sentCBs = null; @@ -1213,7 +1003,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } if(_observer != null && !_readHeader) { - assert(!buf.b.hasRemaining()); + assert (!buf.b.hasRemaining()); observerFinishRead(buf); } @@ -1277,7 +1067,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_endpoint.datagram()) { - throw new Ice.DatagramLimitException(); // The message was truncated. + // The message was truncated. + throw new Ice.DatagramLimitException(); } continue; } @@ -1286,7 +1077,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne int newOp = readOp | writeOp; readyOp = readyOp & ~newOp; - assert(readyOp != 0 || newOp != 0); + assert (readyOp != 0 || newOp != 0); if(_state <= StateNotValidated) { @@ -1329,7 +1120,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else { - assert(_state <= StateClosingPending); + assert (_state <= StateClosingPending); // // We parse messages first, if we receive a close @@ -1337,7 +1128,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // if((readyOp & IceInternal.SocketOperation.Read) != 0) { - info = new MessageInfo(current.stream); // Optimization: use the thread's stream. + // Optimization: use the thread's stream. + info = new MessageInfo(current.stream); newOp |= parseMessage(info); } @@ -1411,19 +1203,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne current.ioCompleted(); } - if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher. + if(!_dispatcher) // Optimization, call dispatch() directly if there's no + // dispatcher. { dispatch(startCB, sentCBs, info); } else { - if(info != null && info.heartbeatCallback == null) // No need for the stream if heartbeat callback + // No need for the stream if heartbeat callback + if(info != null && info.heartbeatCallback == null) { // - // Create a new stream for the dispatch instead of using the thread - // pool's thread stream. + // Create a new stream for the dispatch instead of using the + // thread pool's thread stream. // - assert(info.stream == current.stream); + assert (info.stream == current.stream); IceInternal.BasicStream stream = info.stream; info.stream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); info.stream.swap(stream); @@ -1432,21 +1226,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne final StartCallback finalStartCB = startCB; final java.util.List<OutgoingMessage> finalSentCBs = sentCBs; final MessageInfo finalInfo = info; - _threadPool.dispatchFromThisThread( - new IceInternal.DispatchWorkItem(this) + _threadPool.dispatchFromThisThread(new IceInternal.DispatchWorkItem(this) + { + @Override + public void run() { - @Override - public void - run() - { - dispatch(finalStartCB, finalSentCBs, finalInfo); - } - }); + dispatch(finalStartCB, finalSentCBs, finalInfo); + } + }); } } - protected void - dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info) + protected void dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info) { int count = 0; @@ -1467,7 +1258,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { for(OutgoingMessage msg : sentCBs) { - msg.outAsync.__invokeSent(); + msg.outAsync.invokeSent(); } ++count; } @@ -1480,7 +1271,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // if(info.outAsync != null) { - info.outAsync.__finished(info.stream); + info.outAsync.finished(info.stream); ++count; } @@ -1504,8 +1295,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // if(info.invokeNum > 0) { - invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, - info.adapter); + invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter); // // Don't increase count, the dispatch count is @@ -1552,19 +1342,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - public void - finished(IceInternal.ThreadPoolCurrent current) + public void finished(IceInternal.ThreadPoolCurrent current) { synchronized(this) { - assert(_state == StateClosed); + assert (_state == StateClosed); unscheduleTimeout(IceInternal.SocketOperation.Read | IceInternal.SocketOperation.Write); } // - // If there are no callbacks to call, we don't call ioCompleted() since we're not going - // to call code that will potentially block (this avoids promoting a new leader and - // unecessary thread creation, especially if this is called on shutdown). + // If there are no callbacks to call, we don't call ioCompleted() since + // we're not going to call code that will potentially block (this avoids + // promoting a new leader and unecessary thread creation, especially if + // this is called on shutdown). // if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty() && _callback == null) { @@ -1573,27 +1363,25 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } current.ioCompleted(); - if(!_dispatcher) // Optimization, call finish() directly if there's no dispatcher. + if(!_dispatcher) // Optimization, call finish() directly if there's no + // dispatcher. { finish(); } else { - _threadPool.dispatchFromThisThread( - new IceInternal.DispatchWorkItem(this) + _threadPool.dispatchFromThisThread(new IceInternal.DispatchWorkItem(this) + { + @Override + public void run() { - @Override - public void - run() - { - finish(); - } - }); + finish(); + } + }); } } - public void - finish() + public void finish() { if(_startCallback != null) { @@ -1618,28 +1406,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne p.finished(_exception); if(p.requestId > 0) // Make sure finished isn't called twice. { - if(p.out != null) - { - _requests.remove(p.requestId); - } - else - { - _asyncRequests.remove(p.requestId); - } + _asyncRequests.remove(p.requestId); } } _sendStreams.clear(); } - for(IceInternal.Outgoing p : _requests.values()) - { - p.finished(_exception); - } - _requests.clear(); - for(IceInternal.OutgoingAsync p : _asyncRequests.values()) { - p.__finished(_exception); + p.finished(_exception); } _asyncRequests.clear(); @@ -1657,8 +1432,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // This must be done last as this will cause waitUntilFinished() to return (and communicator - // objects such as the timer might be destroyed too). + // This must be done last as this will cause waitUntilFinished() to + // return (and communicator objects such as the timer might be destroyed + // too). // synchronized(this) { @@ -1671,21 +1447,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - public String - toString() + public String toString() { return _toString(); } @Override - public java.nio.channels.SelectableChannel - fd() + public java.nio.channels.SelectableChannel fd() { return _transceiver.fd(); } - public synchronized void - timedOut() + public synchronized void timedOut() { if(_state <= StateNotValidated) { @@ -1702,49 +1475,45 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - public String - type() + public String type() { return _type; // No mutex lock, _type is immutable. } @Override - public int - timeout() + public int timeout() { - return _endpoint.timeout(); // No mutex protection necessary, _endpoint is immutable. + return _endpoint.timeout(); // No mutex protection necessary, _endpoint + // is immutable. } @Override - public synchronized ConnectionInfo - getInfo() + public synchronized ConnectionInfo getInfo() { if(_state >= StateClosed) { - throw (Ice.LocalException)_exception.fillInStackTrace(); + throw (Ice.LocalException) _exception.fillInStackTrace(); } return initConnectionInfo(); } @Override - public String - _toString() + public String _toString() { return _desc; // No mutex lock, _desc is immutable. } - public synchronized void - exception(LocalException ex) + public synchronized void exception(LocalException ex) { setState(StateClosed, ex); } @Override - public synchronized void - invokeException(int requestId, LocalException ex, int invokeNum) + public synchronized void invokeException(int requestId, LocalException ex, int invokeNum) { // - // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't + // Fatal exception while invoking a request. Since + // sendResponse/sendNoResponse isn't // called in case of a fatal exception we decrement _dispatchCount here. // @@ -1752,9 +1521,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(invokeNum > 0) { - assert(_dispatchCount > 0); + assert (_dispatchCount > 0); _dispatchCount -= invokeNum; - assert(_dispatchCount >= 0); + assert (_dispatchCount >= 0); if(_dispatchCount == 0) { if(_state == StateFinished) @@ -1767,8 +1536,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } public ConnectionI(Communicator communicator, IceInternal.Instance instance, IceInternal.ACMMonitor monitor, - IceInternal.Transceiver transceiver, IceInternal.Connector connector, - IceInternal.EndpointI endpoint, ObjectAdapter adapter) + IceInternal.Transceiver transceiver, IceInternal.Connector connector, IceInternal.EndpointI endpoint, + ObjectAdapter adapter) { _communicator = communicator; _instance = instance; @@ -1780,7 +1549,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _endpoint = endpoint; _adapter = adapter; final Ice.InitializationData initData = instance.initializationData(); - _dispatcher = initData.dispatcher != null; // Cached for better performance. + // Cached for better performance. + _dispatcher = initData.dispatcher != null; _logger = initData.logger; // Cached for better performance. _traceLevels = instance.traceLevels(); // Cached for better performance. _timer = instance.timer(); @@ -1802,7 +1572,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _nextRequestId = 1; _batchAutoFlush = initData.properties.getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0 ? true : false; _batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding, - _batchAutoFlush); + _batchAutoFlush); _batchStreamInUse = false; _batchRequestNum = 0; _batchRequestCompress = false; @@ -1828,7 +1598,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_adapter != null) { - _servantManager = ((ObjectAdapterI)_adapter).getServantManager(); + _servantManager = ((ObjectAdapterI) _adapter).getServantManager(); } else { @@ -1839,7 +1609,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_adapter != null) { - _threadPool = ((ObjectAdapterI)_adapter).getThreadPool(); + _threadPool = ((ObjectAdapterI) _adapter).getThreadPool(); } else { @@ -1858,9 +1628,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - protected synchronized void - finalize() - throws Throwable + protected synchronized void finalize() throws Throwable { try { @@ -1868,7 +1636,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceUtilInternal.Assert.FinalizerAssert(_state == StateFinished); IceUtilInternal.Assert.FinalizerAssert(_dispatchCount == 0); IceUtilInternal.Assert.FinalizerAssert(_sendStreams.isEmpty()); - IceUtilInternal.Assert.FinalizerAssert(_requests.isEmpty()); IceUtilInternal.Assert.FinalizerAssert(_asyncRequests.isEmpty()); } catch(java.lang.Exception ex) @@ -1889,14 +1656,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private static final int StateClosed = 6; private static final int StateFinished = 7; - private void - setState(int state, LocalException ex) + private void setState(int state, LocalException ex) { // // If setState() is called with an exception, then only closed // and closing states are permissible. // - assert(state >= StateClosing); + assert (state >= StateClosing); if(_state == state) // Don't switch twice. { @@ -1908,7 +1674,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // If we are in closed state, an exception must be set. // - assert(_state != StateClosed); + assert (_state != StateClosed); _exception = ex; @@ -1924,8 +1690,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _exception instanceof ForcedCloseConnectionException || _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || - _exception instanceof ObjectAdapterDeactivatedException || - (_exception instanceof ConnectionLostException && _state >= StateClosing))) + _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing))) { warning("connection exception", _exception); } @@ -1940,8 +1705,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(state); } - private void - setState(int state) + private void setState(int state) { // // We don't want to send close connection messages if the endpoint @@ -1969,83 +1733,83 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { switch(state) { - case StateNotInitialized: - { - assert(false); - break; - } - - case StateNotValidated: - { - if(_state != StateNotInitialized) + case StateNotInitialized: { - assert(_state == StateClosed); - return; + assert (false); + break; } - break; - } - case StateActive: - { - // - // Can only switch from holding or not validated to - // active. - // - if(_state != StateHolding && _state != StateNotValidated) + case StateNotValidated: { - return; + if(_state != StateNotInitialized) + { + assert (_state == StateClosed); + return; + } + break; } - _threadPool.register(this, IceInternal.SocketOperation.Read); - break; - } - case StateHolding: - { - // - // Can only switch from active or not validated to - // holding. - // - if(_state != StateActive && _state != StateNotValidated) + case StateActive: { - return; + // + // Can only switch from holding or not validated to + // active. + // + if(_state != StateHolding && _state != StateNotValidated) + { + return; + } + _threadPool.register(this, IceInternal.SocketOperation.Read); + break; } - if(_state == StateActive) + + case StateHolding: { - _threadPool.unregister(this, IceInternal.SocketOperation.Read); + // + // Can only switch from active or not validated to + // holding. + // + if(_state != StateActive && _state != StateNotValidated) + { + return; + } + if(_state == StateActive) + { + _threadPool.unregister(this, IceInternal.SocketOperation.Read); + } + break; } - break; - } - case StateClosing: - case StateClosingPending: - { - // - // Can't change back from closing pending. - // - if(_state >= StateClosingPending) + case StateClosing: + case StateClosingPending: { - return; + // + // Can't change back from closing pending. + // + if(_state >= StateClosingPending) + { + return; + } + break; } - break; - } - case StateClosed: - { - if(_state == StateFinished) + case StateClosed: { - return; + if(_state == StateFinished) + { + return; + } + _threadPool.finish(this); + break; } - _threadPool.finish(this); - break; - } - case StateFinished: - { - assert(_state == StateClosed); - _transceiver.close(); - _communicator = null; - break; - } + case StateFinished: + { + assert (_state == StateClosed); + _transceiver.close(); + _communicator = null; + break; + } } } catch(Ice.LocalException ex) @@ -2086,10 +1850,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne Ice.Instrumentation.ConnectionState newState = toConnectionState(state); if(oldState != newState) { - _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), - _endpoint, - newState, - _observer); + _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), _endpoint, newState, + _observer); if(_observer != null) { _observer.attach(); @@ -2106,8 +1868,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _exception instanceof ForcedCloseConnectionException || _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || - _exception instanceof ObjectAdapterDeactivatedException || - (_exception instanceof ConnectionLostException && _state >= StateClosing))) + _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing))) { _observer.failed(_exception.ice_name()); } @@ -2130,11 +1891,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private void - initiateShutdown() + private void initiateShutdown() { - assert(_state == StateClosing); - assert(_dispatchCount == 0); + assert (_state == StateClosing); + assert (_dispatchCount == 0); if(_shutdownInitiated) { @@ -2148,12 +1908,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Before we shut down, we send a close connection message. // IceInternal.BasicStream os = new IceInternal.BasicStream(_instance, - IceInternal.Protocol.currentProtocolEncoding); + IceInternal.Protocol.currentProtocolEncoding); os.writeBlob(IceInternal.Protocol.magic); IceInternal.Protocol.currentProtocol.__write(os); IceInternal.Protocol.currentProtocolEncoding.__write(os); os.writeByte(IceInternal.Protocol.closeConnectionMsg); - os.writeByte((byte)0); // compression status: always report 0 for CloseConnection in Java. + os.writeByte((byte) 0); // compression status: always report 0 for + // CloseConnection in Java. os.writeInt(IceInternal.Protocol.headerSize); // Message size. if((sendMessage(new OutgoingMessage(os, false, false)) & IceInternal.AsyncStatus.Sent) > 0) @@ -2161,7 +1922,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(StateClosingPending); // - // Notify the the transceiver of the graceful connection closure. + // Notify the the transceiver of the graceful connection + // closure. // int op = _transceiver.closing(true, _exception); if(op != 0) @@ -2173,20 +1935,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private void - heartbeat() + private void heartbeat() { - assert(_state == StateActive); + assert (_state == StateActive); if(!_endpoint.datagram()) { IceInternal.BasicStream os = new IceInternal.BasicStream(_instance, - IceInternal.Protocol.currentProtocolEncoding); + IceInternal.Protocol.currentProtocolEncoding); os.writeBlob(IceInternal.Protocol.magic); IceInternal.Protocol.currentProtocol.__write(os); IceInternal.Protocol.currentProtocolEncoding.__write(os); os.writeByte(IceInternal.Protocol.validateConnectionMsg); - os.writeByte((byte)0); + os.writeByte((byte) 0); os.writeInt(IceInternal.Protocol.headerSize); // Message size. try @@ -2197,13 +1958,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne catch(Ice.LocalException ex) { setState(StateClosed, ex); - assert(_exception != null); + assert (_exception != null); } } } - private boolean - initialize(int operation) + private boolean initialize(int operation) { int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), _hasMoreData); if(s != IceInternal.SocketOperation.None) @@ -2214,7 +1974,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // Update the connection description once the transceiver is initialized. + // Update the connection description once the transceiver is + // initialized. // _desc = _transceiver.toString(); setState(StateNotValidated); @@ -2222,12 +1983,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return true; } - private boolean - validate(int operation) + private boolean validate(int operation) { - if(!_endpoint.datagram()) // Datagram connections are always implicitly validated. + if(!_endpoint.datagram()) // Datagram connections are always implicitly + // validated. { - if(_adapter != null) // The server side has the active role for connection validation. + if(_adapter != null) // The server side has the active role for + // connection validation. { if(_writeStream.isEmpty()) { @@ -2235,8 +1997,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.Protocol.currentProtocol.__write(_writeStream); IceInternal.Protocol.currentProtocolEncoding.__write(_writeStream); _writeStream.writeByte(IceInternal.Protocol.validateConnectionMsg); - _writeStream.writeByte((byte)0); // Compression status (always zero for validate connection). - _writeStream.writeInt(IceInternal.Protocol.headerSize); // Message size. + _writeStream.writeByte((byte) 0); // Compression status + // (always zero for + // validate connection). + _writeStream.writeInt(IceInternal.Protocol.headerSize); // Message + // size. IceInternal.TraceUtil.traceSend(_writeStream, _logger, _traceLevels); _writeStream.prepareWrite(); } @@ -2262,7 +2027,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne observerFinishWrite(_writeStream.getBuffer()); } } - else // The client side has the passive role for connection validation. + else + // The client side has the passive role for connection validation. { if(_readStream.isEmpty()) { @@ -2291,7 +2057,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne observerFinishRead(_readStream.getBuffer()); } - assert(_readStream.pos() == IceInternal.Protocol.headerSize); + assert (_readStream.pos() == IceInternal.Protocol.headerSize); _readStream.pos(0); byte[] m = _readStream.readBlob(4); if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || @@ -2313,7 +2079,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { throw new ConnectionNotValidatedException(); } - _readStream.readByte(); // Ignore compression status for validate connection. + _readStream.readByte(); // Ignore compression status for + // validate connection. int size = _readStream.readInt(); if(size != IceInternal.Protocol.headerSize) { @@ -2335,8 +2102,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return true; } - private int - sendNextMessage(java.util.List<OutgoingMessage> callbacks) + private int sendNextMessage(java.util.List<OutgoingMessage> callbacks) { if(_sendStreams.isEmpty()) { @@ -2344,13 +2110,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else if(_state == StateClosingPending && _writeStream.pos() == 0) { - // Message wasn't sent, empty the _writeStream, we're not going to send more data. + // Message wasn't sent, empty the _writeStream, we're not going to + // send more data. OutgoingMessage message = _sendStreams.getFirst(); _writeStream.swap(message.stream); return IceInternal.SocketOperation.None; } - assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size()); + assert (!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size()); try { while(true) @@ -2390,7 +2157,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Otherwise, prepare the next message stream for writing. // message = _sendStreams.getFirst(); - assert(!message.prepared); + assert (!message.prepared); IceInternal.BasicStream stream = message.stream; message.stream = doCompress(stream, message.compress); @@ -2435,8 +2202,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // If all the messages were sent and we are in the closing state, we schedule - // the close timeout to wait for the peer to close the connection. + // If all the messages were sent and we are in the closing state, we + // schedule the close timeout to wait for the peer to close the + // connection. // if(_state == StateClosing && _dispatchCount == 0) { @@ -2451,10 +2219,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return IceInternal.SocketOperation.None; } - private int - sendMessage(OutgoingMessage message) + private int sendMessage(OutgoingMessage message) { - assert(_state < StateClosed); + assert (_state < StateClosed); if(!_sendStreams.isEmpty()) { @@ -2464,11 +2231,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // Attempt to send the message without blocking. If the send blocks, we register - // the connection with the selector thread. + // Attempt to send the message without blocking. If the send blocks, we + // register the connection with the selector thread. // - assert(!message.prepared); + assert (!message.prepared); IceInternal.BasicStream stream = message.stream; @@ -2523,15 +2290,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return IceInternal.AsyncStatus.Queued; } - private IceInternal.BasicStream - doCompress(IceInternal.BasicStream uncompressed, boolean compress) + private IceInternal.BasicStream doCompress(IceInternal.BasicStream uncompressed, boolean compress) { boolean compressionSupported = false; if(compress) { // - // Don't check whether compression support is available unless the proxy - // is configured for compression. + // Don't check whether compression support is available unless the + // proxy is configured for compression. // compressionSupported = IceInternal.BasicStream.compressible(); } @@ -2548,7 +2314,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Set compression status. // cstream.pos(9); - cstream.writeByte((byte)2); + cstream.writeByte((byte) 2); // // Write the size of the compressed stream into the header. @@ -2557,11 +2323,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne cstream.writeInt(cstream.size()); // - // Write the compression status and size of the compressed stream into the header of the - // uncompressed stream -- we need this to trace requests correctly. + // Write the compression status and size of the compressed + // stream into the header of the uncompressed stream -- we need + // this to trace requests correctly. // uncompressed.pos(9); - uncompressed.writeByte((byte)2); + uncompressed.writeByte((byte) 2); uncompressed.writeInt(cstream.size()); return cstream; @@ -2569,7 +2336,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } uncompressed.pos(9); - uncompressed.writeByte((byte)(compressionSupported ? 1 : 0)); + uncompressed.writeByte((byte) (compressionSupported ? 1 : 0)); // // Not compressed, fill in the message size. @@ -2597,17 +2364,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne ConnectionCallback heartbeatCallback; } - private int - parseMessage(MessageInfo info) + private int parseMessage(MessageInfo info) { - assert(_state > StateNotValidated && _state < StateClosed); + assert (_state > StateNotValidated && _state < StateClosed); _readStream.swap(info.stream); _readStream.resize(IceInternal.Protocol.headerSize, true); _readStream.pos(0); _readHeader = true; - assert(info.stream.pos() == info.stream.size()); + assert (info.stream.pos() == info.stream.size()); // // Connection is validated on first message. This is only used by @@ -2626,7 +2392,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne info.stream.pos(8); byte messageType = info.stream.readByte(); info.compress = info.stream.readByte(); - if(info.compress == (byte)2) + if(info.compress == (byte) 2) { if(IceInternal.BasicStream.compressible()) { @@ -2636,7 +2402,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { FeatureNotSupportedException ex = new FeatureNotSupportedException(); ex.unsupportedFeature = "Cannot uncompress compressed message: " - + "org.apache.tools.bzip2.CBZip2OutputStream was not found"; + + "org.apache.tools.bzip2.CBZip2OutputStream was not found"; throw ex; } } @@ -2659,7 +2425,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(StateClosingPending, new CloseConnectionException()); // - // Notify the the transceiver of the graceful connection closure. + // Notify the the transceiver of the graceful connection + // closure. // int op = _transceiver.closing(false, _exception); if(op != 0) @@ -2675,9 +2442,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state >= StateClosing) { - IceInternal.TraceUtil.trace("received request during closing\n" + - "(ignored by server, client will retry)", - info.stream, _logger, _traceLevels); + IceInternal.TraceUtil.trace("received request during closing\n" + + "(ignored by server, client will retry)", info.stream, _logger, + _traceLevels); } else { @@ -2695,9 +2462,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state >= StateClosing) { - IceInternal.TraceUtil.trace("received batch request during closing\n" + - "(ignored by server, client will retry)", - info.stream, _logger, _traceLevels); + IceInternal.TraceUtil.trace("received batch request during closing\n" + + "(ignored by server, client will retry)", info.stream, _logger, + _traceLevels); } else { @@ -2719,18 +2486,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); info.requestId = info.stream.readInt(); - IceInternal.Outgoing out = _requests.remove(info.requestId); - if(out != null) - { - out.finished(info.stream); - } - else + info.outAsync = _asyncRequests.remove(info.requestId); + if(info.outAsync != null) { - info.outAsync = _asyncRequests.remove(info.requestId); - if(info.outAsync != null) - { - ++_dispatchCount; - } + ++_dispatchCount; } notifyAll(); // Notify threads blocked in close(false) break; @@ -2750,7 +2509,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne default: { IceInternal.TraceUtil.trace("received unknown message\n(invalid, closing connection)", info.stream, - _logger, _traceLevels); + _logger, _traceLevels); throw new UnknownMessageException(); } } @@ -2773,9 +2532,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return _state == StateHolding ? IceInternal.SocketOperation.None : IceInternal.SocketOperation.Read; } - private void - invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress, - IceInternal.ServantManager servantManager, ObjectAdapter adapter) + private void invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress, + IceInternal.ServantManager servantManager, ObjectAdapter adapter) { // // Note: In contrast to other private or protected methods, this @@ -2811,7 +2569,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { invokeException(requestId, ex, invokeNum); } - catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace. + catch(java.lang.AssertionError ex) // Upon assertion, we print the stack + // trace. { UnknownException uex = new UnknownException(ex); java.io.StringWriter sw = new java.io.StringWriter(); @@ -2842,8 +2601,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private void - scheduleTimeout(int status) + private void scheduleTimeout(int status) { int timeout; if(_state < StateActive) @@ -2892,8 +2650,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { _readTimeoutFuture.cancel(false); } - _readTimeoutFuture = _timer.schedule(_readTimeout, timeout, - java.util.concurrent.TimeUnit.MILLISECONDS); + _readTimeoutFuture = _timer.schedule(_readTimeout, timeout, java.util.concurrent.TimeUnit.MILLISECONDS); } if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0) { @@ -2902,17 +2659,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _writeTimeoutFuture.cancel(false); } _writeTimeoutFuture = _timer.schedule(_writeTimeout, timeout, - java.util.concurrent.TimeUnit.MILLISECONDS); + java.util.concurrent.TimeUnit.MILLISECONDS); } } catch(Throwable ex) { - assert(false); + assert (false); } } - private void - unscheduleTimeout(int status) + private void unscheduleTimeout(int status) { if((status & IceInternal.SocketOperation.Read) != 0 && _readTimeoutFuture != null) { @@ -2927,8 +2683,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private ConnectionInfo - initConnectionInfo() + private ConnectionInfo initConnectionInfo() { if(_info != null) { @@ -2941,19 +2696,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne info.incoming = _connector == null; if(_state > StateNotInitialized) { - _info = info; // Cache the connection information only if initialized. + _info = info; // Cache the connection information only if + // initialized. } return info; } - private Ice.Instrumentation.ConnectionState - toConnectionState(int state) + private Ice.Instrumentation.ConnectionState toConnectionState(int state) { return connectionStateMap[state]; } - private void - warning(String msg, java.lang.Exception ex) + private void warning(String msg, java.lang.Exception ex) { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); @@ -2963,42 +2717,38 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _logger.warning(s); } - private void - observerStartRead(IceInternal.Buffer buf) + private void observerStartRead(IceInternal.Buffer buf) { if(_readStreamPos >= 0) { - assert(!buf.empty()); + assert (!buf.empty()); _observer.receivedBytes(buf.b.position() - _readStreamPos); } _readStreamPos = buf.empty() ? -1 : buf.b.position(); } - private void - observerFinishRead(IceInternal.Buffer buf) + private void observerFinishRead(IceInternal.Buffer buf) { if(_readStreamPos == -1) { return; } - assert(buf.b.position() >= _readStreamPos); + assert (buf.b.position() >= _readStreamPos); _observer.receivedBytes(buf.b.position() - _readStreamPos); _readStreamPos = -1; } - private void - observerStartWrite(IceInternal.Buffer buf) + private void observerStartWrite(IceInternal.Buffer buf) { if(_writeStreamPos >= 0) { - assert(!buf.empty()); + assert (!buf.empty()); _observer.sentBytes(buf.b.position() - _writeStreamPos); } _writeStreamPos = buf.empty() ? -1 : buf.b.position(); } - private void - observerFinishWrite(IceInternal.Buffer buf) + private void observerFinishWrite(IceInternal.Buffer buf) { if(_writeStreamPos == -1) { @@ -3011,8 +2761,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _writeStreamPos = -1; } - private IceInternal.Incoming - getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId) + private IceInternal.Incoming getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId) { IceInternal.Incoming in = null; @@ -3041,8 +2790,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return in; } - private void - reclaimIncoming(IceInternal.Incoming in) + private void reclaimIncoming(IceInternal.Incoming in) { if(_cacheBuffers > 0) { @@ -3058,8 +2806,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private void - reap() + private void reap() { if(_monitor != null) { @@ -3086,7 +2833,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { wait(); } - catch (InterruptedException e) + catch(InterruptedException e) { interrupted = true; } @@ -3110,17 +2857,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.requestId = 0; } - OutgoingMessage(IceInternal.OutgoingMessageCallback out, IceInternal.BasicStream stream, boolean compress, - int requestId) - { - this.stream = stream; - this.compress = compress; - this.out = out; - this.requestId = requestId; - } - OutgoingMessage(IceInternal.OutgoingAsyncMessageCallback out, IceInternal.BasicStream stream, boolean compress, - int requestId) + int requestId) { this.stream = stream; this.compress = compress; @@ -3128,56 +2866,42 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.requestId = requestId; } - public void - timedOut() + public void timedOut() { - assert((out != null || outAsync != null)); - out = null; + assert (outAsync != null); outAsync = null; } - public void - adopt() + public void adopt() { if(adopt) { - IceInternal.BasicStream stream = - new IceInternal.BasicStream(this.stream.instance(), IceInternal.Protocol.currentProtocolEncoding); + IceInternal.BasicStream stream = new IceInternal.BasicStream(this.stream.instance(), + IceInternal.Protocol.currentProtocolEncoding); stream.swap(this.stream); this.stream = stream; adopt = false; } } - public boolean - sent() + public boolean sent() { - if(out != null) + if(outAsync != null) { - out.sent(); - } - else if(outAsync != null) - { - return outAsync.__sent(); + return outAsync.sent(); } return false; } - public void - finished(Ice.LocalException ex) + public void finished(Ice.LocalException ex) { - if(out != null) - { - out.finished(ex); - } - else if(outAsync != null) + if(outAsync != null) { - outAsync.__finished(ex); + outAsync.finished(ex); } } public IceInternal.BasicStream stream; - public IceInternal.OutgoingMessageCallback out; public IceInternal.OutgoingAsyncMessageCallback outAsync; public boolean compress; public int requestId; @@ -3219,10 +2943,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private int _nextRequestId; - private java.util.Map<Integer, IceInternal.Outgoing> _requests = - new java.util.HashMap<Integer, IceInternal.Outgoing>(); - private java.util.Map<Integer, IceInternal.OutgoingAsync> _asyncRequests = - new java.util.HashMap<Integer, IceInternal.OutgoingAsync>(); + private java.util.Map<Integer, IceInternal.OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, IceInternal.OutgoingAsync>(); private LocalException _exception; @@ -3262,14 +2983,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private ConnectionCallback _callback; private static Ice.Instrumentation.ConnectionState connectionStateMap[] = { - Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized - Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated - Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive - Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding - Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing - Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosingPending - Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed - Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished + Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized + Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated + Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive + Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding + Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing + Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosingPending + Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed + Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished }; } |