diff options
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 4182 |
1 files changed, 2091 insertions, 2091 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 4d55cb3a05b..85425864879 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -27,229 +27,229 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void validate() { - if(!_endpoint.datagram()) // Datagram connections are always implicitly validated. - { - boolean active; - - synchronized(this) - { - if(_thread != null && _thread != Thread.currentThread()) - { - // - // In thread per connection mode, this connection's thread - // will take care of connection validation. Therefore all we - // have to do here is to wait until this thread has completed - // validation. - // - while(_state == StateNotValidated) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - if(_state >= StateClosing) - { - assert(_exception != null); - throw _exception; - } - - return; - } - - // - // The connection might already be closed (e.g.: the communicator - // was destroyed or object adapter deactivated.) - // - assert(_state == StateNotValidated || _state == StateClosed); - if(_state == StateClosed) - { - assert(_exception != null); - throw _exception; - } - - if(_adapter != null) - { - active = true; // The server side has the active role for connection validation. - } - else - { - active = false; // The client side has the passive role for connection validation. - } - } - - try - { - int timeout; - IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideConnectTimeout) - { - timeout = defaultsAndOverrides.overrideConnectTimeoutValue; - } - else - { - timeout = _endpoint.timeout(); - } - - if(active) - { - synchronized(_sendMutex) - { - if(_transceiver == null) // Has the transceiver already been closed? - { - assert(_exception != null); - throw _exception; // The exception is immutable at this point. - } - - IceInternal.BasicStream os = new IceInternal.BasicStream(_instance); - os.writeBlob(IceInternal.Protocol.magic); - os.writeByte(IceInternal.Protocol.protocolMajor); - os.writeByte(IceInternal.Protocol.protocolMinor); - os.writeByte(IceInternal.Protocol.encodingMajor); - os.writeByte(IceInternal.Protocol.encodingMinor); - os.writeByte(IceInternal.Protocol.validateConnectionMsg); - os.writeByte((byte)0); // Compression status (always zero for validate connection). - os.writeInt(IceInternal.Protocol.headerSize); // Message size. - IceInternal.TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels); - try - { - _transceiver.write(os, timeout); - } - catch(Ice.TimeoutException ex) - { - throw new Ice.ConnectTimeoutException(); - } - } - } - else - { - IceInternal.BasicStream is = new IceInternal.BasicStream(_instance); - is.resize(IceInternal.Protocol.headerSize, true); - is.pos(0); - try - { - _transceiver.read(is, timeout); - } - catch(Ice.TimeoutException ex) - { - throw new Ice.ConnectTimeoutException(); - } - assert(is.pos() == IceInternal.Protocol.headerSize); - is.pos(0); - byte[] m = is.readBlob(4); - if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || - m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) - { - BadMagicException ex = new BadMagicException(); - ex.badMagic = m; - throw ex; - } - byte pMajor = is.readByte(); - byte pMinor = is.readByte(); - if(pMajor != IceInternal.Protocol.protocolMajor) - { - UnsupportedProtocolException e = new UnsupportedProtocolException(); - e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; - e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; - e.major = IceInternal.Protocol.protocolMajor; - e.minor = IceInternal.Protocol.protocolMinor; - throw e; - } - byte eMajor = is.readByte(); - byte eMinor = is.readByte(); - if(eMajor != IceInternal.Protocol.encodingMajor) - { - UnsupportedEncodingException e = new UnsupportedEncodingException(); - e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; - e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; - e.major = IceInternal.Protocol.encodingMajor; - e.minor = IceInternal.Protocol.encodingMinor; - throw e; - } - byte messageType = is.readByte(); - if(messageType != IceInternal.Protocol.validateConnectionMsg) - { - throw new ConnectionNotValidatedException(); - } - byte compress = is.readByte(); // Ignore compression status for validate connection. - int size = is.readInt(); - if(size != IceInternal.Protocol.headerSize) - { - throw new IllegalMessageSizeException(); - } - IceInternal.TraceUtil.traceHeader("received validate connection", is, _logger, _traceLevels); - } - } - catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). - { - synchronized(this) - { - setState(StateClosed, ex.get()); - assert(_exception != null); - throw _exception; - } - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - assert(_exception != null); - throw _exception; - } - } - } - - synchronized(this) - { - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; - } - - // - // We start out in holding state. - // - setState(StateHolding); - } + if(!_endpoint.datagram()) // Datagram connections are always implicitly validated. + { + boolean active; + + synchronized(this) + { + if(_thread != null && _thread != Thread.currentThread()) + { + // + // In thread per connection mode, this connection's thread + // will take care of connection validation. Therefore all we + // have to do here is to wait until this thread has completed + // validation. + // + while(_state == StateNotValidated) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + if(_state >= StateClosing) + { + assert(_exception != null); + throw _exception; + } + + return; + } + + // + // The connection might already be closed (e.g.: the communicator + // was destroyed or object adapter deactivated.) + // + assert(_state == StateNotValidated || _state == StateClosed); + if(_state == StateClosed) + { + assert(_exception != null); + throw _exception; + } + + if(_adapter != null) + { + active = true; // The server side has the active role for connection validation. + } + else + { + active = false; // The client side has the passive role for connection validation. + } + } + + try + { + int timeout; + IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + if(defaultsAndOverrides.overrideConnectTimeout) + { + timeout = defaultsAndOverrides.overrideConnectTimeoutValue; + } + else + { + timeout = _endpoint.timeout(); + } + + if(active) + { + synchronized(_sendMutex) + { + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } + + IceInternal.BasicStream os = new IceInternal.BasicStream(_instance); + os.writeBlob(IceInternal.Protocol.magic); + os.writeByte(IceInternal.Protocol.protocolMajor); + os.writeByte(IceInternal.Protocol.protocolMinor); + os.writeByte(IceInternal.Protocol.encodingMajor); + os.writeByte(IceInternal.Protocol.encodingMinor); + os.writeByte(IceInternal.Protocol.validateConnectionMsg); + os.writeByte((byte)0); // Compression status (always zero for validate connection). + os.writeInt(IceInternal.Protocol.headerSize); // Message size. + IceInternal.TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels); + try + { + _transceiver.write(os, timeout); + } + catch(Ice.TimeoutException ex) + { + throw new Ice.ConnectTimeoutException(); + } + } + } + else + { + IceInternal.BasicStream is = new IceInternal.BasicStream(_instance); + is.resize(IceInternal.Protocol.headerSize, true); + is.pos(0); + try + { + _transceiver.read(is, timeout); + } + catch(Ice.TimeoutException ex) + { + throw new Ice.ConnectTimeoutException(); + } + assert(is.pos() == IceInternal.Protocol.headerSize); + is.pos(0); + byte[] m = is.readBlob(4); + if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || + m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) + { + BadMagicException ex = new BadMagicException(); + ex.badMagic = m; + throw ex; + } + byte pMajor = is.readByte(); + byte pMinor = is.readByte(); + if(pMajor != IceInternal.Protocol.protocolMajor) + { + UnsupportedProtocolException e = new UnsupportedProtocolException(); + e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; + e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; + e.major = IceInternal.Protocol.protocolMajor; + e.minor = IceInternal.Protocol.protocolMinor; + throw e; + } + byte eMajor = is.readByte(); + byte eMinor = is.readByte(); + if(eMajor != IceInternal.Protocol.encodingMajor) + { + UnsupportedEncodingException e = new UnsupportedEncodingException(); + e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; + e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; + e.major = IceInternal.Protocol.encodingMajor; + e.minor = IceInternal.Protocol.encodingMinor; + throw e; + } + byte messageType = is.readByte(); + if(messageType != IceInternal.Protocol.validateConnectionMsg) + { + throw new ConnectionNotValidatedException(); + } + byte compress = is.readByte(); // Ignore compression status for validate connection. + int size = is.readInt(); + if(size != IceInternal.Protocol.headerSize) + { + throw new IllegalMessageSizeException(); + } + IceInternal.TraceUtil.traceHeader("received validate connection", is, _logger, _traceLevels); + } + } + catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). + { + synchronized(this) + { + setState(StateClosed, ex.get()); + assert(_exception != null); + throw _exception; + } + } + catch(LocalException ex) + { + synchronized(this) + { + setState(StateClosed, ex); + assert(_exception != null); + throw _exception; + } + } + } + + synchronized(this) + { + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + + // + // We start out in holding state. + // + setState(StateHolding); + } } public synchronized void activate() { - while(_state == StateNotValidated) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - setState(StateActive); + while(_state == StateNotValidated) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + setState(StateActive); } public synchronized void hold() { - while(_state == StateNotValidated) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - setState(StateHolding); + while(_state == StateNotValidated) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + setState(StateHolding); } // DestructionReason. @@ -259,93 +259,93 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public synchronized void destroy(int reason) { - switch(reason) - { - case ObjectAdapterDeactivated: - { - setState(StateClosing, new ObjectAdapterDeactivatedException()); - break; - } - - case CommunicatorDestroyed: - { - setState(StateClosing, new CommunicatorDestroyedException()); - break; - } - } + switch(reason) + { + case ObjectAdapterDeactivated: + { + setState(StateClosing, new ObjectAdapterDeactivatedException()); + break; + } + + case CommunicatorDestroyed: + { + setState(StateClosing, new CommunicatorDestroyedException()); + break; + } + } } public synchronized void close(boolean force) { - if(force) - { - setState(StateClosed, new ForcedCloseConnectionException()); - } - else - { - // - // If we do a graceful shutdown, then we wait until all - // outstanding requests have been completed. Otherwise, - // the CloseConnectionException will cause all outstanding - // requests to be retried, regardless of whether the - // server has processed them or not. - // - while(!_requests.isEmpty() || !_asyncRequests.isEmpty()) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - setState(StateClosing, new CloseConnectionException()); - } + if(force) + { + setState(StateClosed, new ForcedCloseConnectionException()); + } + else + { + // + // If we do a graceful shutdown, then we wait until all + // outstanding requests have been completed. Otherwise, + // the CloseConnectionException will cause all outstanding + // requests to be retried, regardless of whether the + // server has processed them or not. + // + while(!_requests.isEmpty() || !_asyncRequests.isEmpty()) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + setState(StateClosing, new CloseConnectionException()); + } } public synchronized boolean isDestroyed() { - return _state >= StateClosing; + return _state >= StateClosing; } public boolean isFinished() { - Thread threadPerConnection = null; - - synchronized(this) - { - if(_transceiver != null || _dispatchCount != 0 || (_thread != null && _thread.isAlive())) - { - return false; - } - - assert(_state == StateClosed); - - threadPerConnection = _thread; - _thread = null; - } - - if(threadPerConnection != null) - { - while(true) - { - try - { - threadPerConnection.join(); - break; - } - catch(InterruptedException ex) - { - } - } - } - - return true; + Thread threadPerConnection = null; + + synchronized(this) + { + if(_transceiver != null || _dispatchCount != 0 || (_thread != null && _thread.isAlive())) + { + return false; + } + + assert(_state == StateClosed); + + threadPerConnection = _thread; + _thread = null; + } + + if(threadPerConnection != null) + { + while(true) + { + try + { + threadPerConnection.join(); + break; + } + catch(InterruptedException ex) + { + } + } + } + + return true; } public synchronized void @@ -361,457 +361,457 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public synchronized void waitUntilHolding() { - while(_state < StateHolding || _dispatchCount > 0) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } + while(_state < StateHolding || _dispatchCount > 0) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } } } public void waitUntilFinished() { - Thread threadPerConnection = null; - - synchronized(this) - { - // - // We wait indefinitely until connection closing has been - // initiated. We also wait indefinitely until all outstanding - // requests are completed. Otherwise we couldn't guarantee - // that there are no outstanding calls when deactivate() is - // called on the servant locators. - // - while(_state < StateClosing || _dispatchCount > 0) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - // - // Now we must wait until close() has been called on the - // transceiver. - // - while(_transceiver != null) - { - try - { - if(_state != StateClosed && _endpoint.timeout() >= 0) - { - long absoluteWaitTime = _stateTime + _endpoint.timeout(); - long waitTime = absoluteWaitTime - System.currentTimeMillis(); - - if(waitTime > 0) - { - // - // We must wait a bit longer until we close this - // connection. - // - wait(waitTime); - if(System.currentTimeMillis() >= absoluteWaitTime) - { - setState(StateClosed, new CloseTimeoutException()); - } - } - else - { - // - // We already waited long enough, so let's close this - // connection! - // - setState(StateClosed, new CloseTimeoutException()); - } - - // - // No return here, we must still wait until - // close() is called on the _transceiver. - // - } - else - { - wait(); - } - } - catch(InterruptedException ex) - { - } - } - - assert(_state == StateClosed); - - threadPerConnection = _thread; - _thread = null; + Thread threadPerConnection = null; + + synchronized(this) + { + // + // We wait indefinitely until connection closing has been + // initiated. We also wait indefinitely until all outstanding + // requests are completed. Otherwise we couldn't guarantee + // that there are no outstanding calls when deactivate() is + // called on the servant locators. + // + while(_state < StateClosing || _dispatchCount > 0) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + // + // Now we must wait until close() has been called on the + // transceiver. + // + while(_transceiver != null) + { + try + { + if(_state != StateClosed && _endpoint.timeout() >= 0) + { + long absoluteWaitTime = _stateTime + _endpoint.timeout(); + long waitTime = absoluteWaitTime - System.currentTimeMillis(); + + if(waitTime > 0) + { + // + // We must wait a bit longer until we close this + // connection. + // + wait(waitTime); + if(System.currentTimeMillis() >= absoluteWaitTime) + { + setState(StateClosed, new CloseTimeoutException()); + } + } + else + { + // + // We already waited long enough, so let's close this + // connection! + // + setState(StateClosed, new CloseTimeoutException()); + } + + // + // No return here, we must still wait until + // close() is called on the _transceiver. + // + } + else + { + wait(); + } + } + catch(InterruptedException ex) + { + } + } + + assert(_state == StateClosed); + + threadPerConnection = _thread; + _thread = null; // // Clear the OA. See bug 1673 for the details of why this is necessary. // _adapter = null; - } - - if(threadPerConnection != null) - { - while(true) - { - try - { - threadPerConnection.join(); - break; - } - catch(InterruptedException ex) - { - } - } - } + } + + if(threadPerConnection != null) + { + while(true) + { + try + { + threadPerConnection.join(); + break; + } + catch(InterruptedException ex) + { + } + } + } } public synchronized void monitor() { - if(_state != StateActive) - { - return; - } - - // - // Check for timed out async requests. - // - java.util.Iterator i = _asyncRequests.entryIterator(); - while(i.hasNext()) - { - IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); - IceInternal.OutgoingAsync out = (IceInternal.OutgoingAsync)e.getValue(); - if(out.__timedOut()) - { - setState(StateClosed, new TimeoutException()); - return; - } - } - - // - // Active connection management for idle connections. - // - if(_acmTimeout > 0 && - _requests.isEmpty() && _asyncRequests.isEmpty() && - !_batchStreamInUse && _batchStream.isEmpty() && - _dispatchCount == 0) - { - if(System.currentTimeMillis() >= _acmAbsoluteTimeoutMillis) - { - setState(StateClosing, new ConnectionTimeoutException()); - return; - } - } + if(_state != StateActive) + { + return; + } + + // + // Check for timed out async requests. + // + java.util.Iterator i = _asyncRequests.entryIterator(); + while(i.hasNext()) + { + IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); + IceInternal.OutgoingAsync out = (IceInternal.OutgoingAsync)e.getValue(); + if(out.__timedOut()) + { + setState(StateClosed, new TimeoutException()); + return; + } + } + + // + // Active connection management for idle connections. + // + if(_acmTimeout > 0 && + _requests.isEmpty() && _asyncRequests.isEmpty() && + !_batchStreamInUse && _batchStream.isEmpty() && + _dispatchCount == 0) + { + if(System.currentTimeMillis() >= _acmAbsoluteTimeoutMillis) + { + setState(StateClosing, new ConnectionTimeoutException()); + return; + } + } } public void sendRequest(IceInternal.BasicStream os, IceInternal.Outgoing out, boolean compress) throws IceInternal.LocalExceptionWrapper { - int requestId = 0; - IceInternal.BasicStream stream = null; - - synchronized(this) - { - assert(!(out != null && _endpoint.datagram())); // Twoway requests cannot be datagrams. - - if(_exception != null) - { - // - // If the connection is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - throw new IceInternal.LocalExceptionWrapper(_exception, true); - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Only add to the request map if this is a twoway call. - // - if(out != null) - { - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - os.pos(IceInternal.Protocol.headerSize); - os.writeInt(requestId); - - // - // Add to the requests map. - // - _requests.put(requestId, out); - } - - stream = doCompress(os, _overrideCompress ? _overrideCompressValue : compress); - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; - } - } - - try - { - synchronized(_sendMutex) - { - if(_transceiver == null) // Has the transceiver already been closed? - { - assert(_exception != null); - throw _exception; // The exception is immutable at this point. - } - - // - // Send the request. - // - IceInternal.TraceUtil.traceRequest("sending request", os, _logger, _traceLevels); - _transceiver.write(stream, _endpoint.timeout()); - } - } - catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). - { - synchronized(this) - { - setState(StateClosed, ex.get()); - assert(_exception != null); - - if(out != null) - { - // - // If the request has already been removed from - // the request map, we are out of luck. It would - // mean that finished() has been called already, - // and therefore the exception has been set using - // the Outgoing::finished() callback. In this - // case, we cannot throw the exception here, - // because we must not both raise an exception and - // have Outgoing::finished() called with an - // exception. This means that in some rare cases, - // a request will not be retried even though it - // could. But I honestly don't know how I could - // avoid this, without a very elaborate and - // complex design, which would be bad for - // performance. - // - IceInternal.Outgoing o = (IceInternal.Outgoing)_requests.remove(requestId); - if(o != null) - { - assert(o == out); - throw new IceInternal.LocalExceptionWrapper(_exception, ex.retry()); - } - } - else - { - throw new IceInternal.LocalExceptionWrapper(_exception, ex.retry()); - } - } - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - assert(_exception != null); - - if(out != null) - { - // - // If the request has already been removed from - // the request map, we are out of luck. It would - // mean that finished() has been called already, - // and therefore the exception has been set using - // the Outgoing::finished() callback. In this - // case, we cannot throw the exception here, - // because we must not both raise an exception and - // have Outgoing::finished() called with an - // exception. This means that in some rare cases, - // a request will not be retried even though it - // could. But I honestly don't know how I could - // avoid this, without a very elaborate and - // complex design, which would be bad for - // performance. - // - IceInternal.Outgoing o = (IceInternal.Outgoing)_requests.remove(requestId); - if(o != null) - { - assert(o == out); - throw _exception; - } - } - else - { - throw _exception; - } - } - } + int requestId = 0; + IceInternal.BasicStream stream = null; + + synchronized(this) + { + assert(!(out != null && _endpoint.datagram())); // Twoway requests cannot be datagrams. + + if(_exception != null) + { + // + // If the connection is closed before we even have a chance + // to send our request, we always try to send the request + // again. + // + throw new IceInternal.LocalExceptionWrapper(_exception, true); + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + // + // Only add to the request map if this is a twoway call. + // + if(out != null) + { + // + // Create a new unique request ID. + // + requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + // + // Fill in the request ID. + // + os.pos(IceInternal.Protocol.headerSize); + os.writeInt(requestId); + + // + // Add to the requests map. + // + _requests.put(requestId, out); + } + + stream = doCompress(os, _overrideCompress ? _overrideCompressValue : compress); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + } + + try + { + synchronized(_sendMutex) + { + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } + + // + // Send the request. + // + IceInternal.TraceUtil.traceRequest("sending request", os, _logger, _traceLevels); + _transceiver.write(stream, _endpoint.timeout()); + } + } + catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). + { + synchronized(this) + { + setState(StateClosed, ex.get()); + assert(_exception != null); + + if(out != null) + { + // + // If the request has already been removed from + // the request map, we are out of luck. It would + // mean that finished() has been called already, + // and therefore the exception has been set using + // the Outgoing::finished() callback. In this + // case, we cannot throw the exception here, + // because we must not both raise an exception and + // have Outgoing::finished() called with an + // exception. This means that in some rare cases, + // a request will not be retried even though it + // could. But I honestly don't know how I could + // avoid this, without a very elaborate and + // complex design, which would be bad for + // performance. + // + IceInternal.Outgoing o = (IceInternal.Outgoing)_requests.remove(requestId); + if(o != null) + { + assert(o == out); + throw new IceInternal.LocalExceptionWrapper(_exception, ex.retry()); + } + } + else + { + throw new IceInternal.LocalExceptionWrapper(_exception, ex.retry()); + } + } + } + catch(LocalException ex) + { + synchronized(this) + { + setState(StateClosed, ex); + assert(_exception != null); + + if(out != null) + { + // + // If the request has already been removed from + // the request map, we are out of luck. It would + // mean that finished() has been called already, + // and therefore the exception has been set using + // the Outgoing::finished() callback. In this + // case, we cannot throw the exception here, + // because we must not both raise an exception and + // have Outgoing::finished() called with an + // exception. This means that in some rare cases, + // a request will not be retried even though it + // could. But I honestly don't know how I could + // avoid this, without a very elaborate and + // complex design, which would be bad for + // performance. + // + IceInternal.Outgoing o = (IceInternal.Outgoing)_requests.remove(requestId); + if(o != null) + { + assert(o == out); + throw _exception; + } + } + else + { + throw _exception; + } + } + } } public void sendAsyncRequest(IceInternal.BasicStream os, IceInternal.OutgoingAsync out, boolean compress) throws IceInternal.LocalExceptionWrapper { - int requestId = 0; - IceInternal.BasicStream stream = null; - - synchronized(this) - { - assert(!_endpoint.datagram()); // Twoway requests cannot be datagrams, and async implies twoway. - - if(_exception != null) - { - // - // If the connection is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - throw new IceInternal.LocalExceptionWrapper(_exception, true); - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - os.pos(IceInternal.Protocol.headerSize); - os.writeInt(requestId); - - // - // Add to the async requests map. - // - _asyncRequests.put(requestId, out); - - stream = doCompress(os, _overrideCompress ? _overrideCompressValue : compress); - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; - } - } - - try - { - synchronized(_sendMutex) - { - if(_transceiver == null) // Has the transceiver already been closed? - { - assert(_exception != null); - throw _exception; // The exception is immutable at this point. - } - - // - // Send the request. - // - IceInternal.TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels); - _transceiver.write(stream, _endpoint.timeout()); - } - } - catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). - { - synchronized(this) - { - setState(StateClosed, ex.get()); - assert(_exception != null); - - // - // If the request has already been removed from the - // async request map, we are out of luck. It would - // mean that finished() has been called already, and - // therefore the exception has been set using the - // OutgoingAsync::__finished() callback. In this case, - // we cannot throw the exception here, because we must - // not both raise an exception and have - // OutgoingAsync::__finished() called with an - // exception. This means that in some rare cases, a - // request will not be retried even though it - // could. But I honestly don't know how I could avoid - // this, without a very elaborate and complex design, - // which would be bad for performance. - // - IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)_asyncRequests.remove(requestId); - if(o != null) - { - assert(o == out); - throw new IceInternal.LocalExceptionWrapper(_exception, ex.retry()); - } - } - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - assert(_exception != null); - - // - // If the request has already been removed from the - // async request map, we are out of luck. It would - // mean that finished() has been called already, and - // therefore the exception has been set using the - // OutgoingAsync::__finished() callback. In this case, - // we cannot throw the exception here, because we must - // not both raise an exception and have - // OutgoingAsync::__finished() called with an - // exception. This means that in some rare cases, a - // request will not be retried even though it - // could. But I honestly don't know how I could avoid - // this, without a very elaborate and complex design, - // which would be bad for performance. - // - IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)_asyncRequests.remove(requestId); - if(o != null) - { - assert(o == out); - throw _exception; - } - } - } + int requestId = 0; + IceInternal.BasicStream stream = null; + + synchronized(this) + { + assert(!_endpoint.datagram()); // Twoway requests cannot be datagrams, and async implies twoway. + + if(_exception != null) + { + // + // If the connection is closed before we even have a chance + // to send our request, we always try to send the request + // again. + // + throw new IceInternal.LocalExceptionWrapper(_exception, true); + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + // + // Create a new unique request ID. + // + requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + // + // Fill in the request ID. + // + os.pos(IceInternal.Protocol.headerSize); + os.writeInt(requestId); + + // + // Add to the async requests map. + // + _asyncRequests.put(requestId, out); + + stream = doCompress(os, _overrideCompress ? _overrideCompressValue : compress); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + } + + try + { + synchronized(_sendMutex) + { + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } + + // + // Send the request. + // + IceInternal.TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels); + _transceiver.write(stream, _endpoint.timeout()); + } + } + catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). + { + synchronized(this) + { + setState(StateClosed, ex.get()); + assert(_exception != null); + + // + // If the request has already been removed from the + // async request map, we are out of luck. It would + // mean that finished() has been called already, and + // therefore the exception has been set using the + // OutgoingAsync::__finished() callback. In this case, + // we cannot throw the exception here, because we must + // not both raise an exception and have + // OutgoingAsync::__finished() called with an + // exception. This means that in some rare cases, a + // request will not be retried even though it + // could. But I honestly don't know how I could avoid + // this, without a very elaborate and complex design, + // which would be bad for performance. + // + IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)_asyncRequests.remove(requestId); + if(o != null) + { + assert(o == out); + throw new IceInternal.LocalExceptionWrapper(_exception, ex.retry()); + } + } + } + catch(LocalException ex) + { + synchronized(this) + { + setState(StateClosed, ex); + assert(_exception != null); + + // + // If the request has already been removed from the + // async request map, we are out of luck. It would + // mean that finished() has been called already, and + // therefore the exception has been set using the + // OutgoingAsync::__finished() callback. In this case, + // we cannot throw the exception here, because we must + // not both raise an exception and have + // OutgoingAsync::__finished() called with an + // exception. This means that in some rare cases, a + // request will not be retried even though it + // could. But I honestly don't know how I could avoid + // this, without a very elaborate and complex design, + // which would be bad for performance. + // + IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)_asyncRequests.remove(requestId); + if(o != null) + { + assert(o == out); + throw _exception; + } + } + } } public synchronized void prepareBatchRequest(IceInternal.BasicStream os) { - while(_batchStreamInUse && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } + while(_batchStreamInUse && _exception == null) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } if(_exception != null) { @@ -819,29 +819,29 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } assert(_state > StateNotValidated); - assert(_state < StateClosing); + assert(_state < StateClosing); if(_batchStream.isEmpty()) { - try - { - _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr); - } - catch(LocalException ex) - { - setState(StateClosed, ex); - throw ex; - } + try + { + _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr); + } + catch(LocalException ex) + { + setState(StateClosed, ex); + throw ex; + } } _batchStreamInUse = true; - _batchMarker = _batchStream.size(); - _batchStream.swap(os); + _batchMarker = _batchStream.size(); + _batchStream.swap(os); - // - // The batch stream now belongs to the caller, until - // finishBatchRequest() or abortBatchRequest() is called. - // + // + // The batch stream now belongs to the caller, until + // finishBatchRequest() or abortBatchRequest() is called. + // } public void @@ -859,43 +859,43 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_batchAutoFlush) { - synchronized(_sendMutex) - { - if(_transceiver == null) - { - assert(_exception != null); - throw _exception; // The exception is immutable at this point. - } - // - // Throw memory limit exception if the first - // message added causes us to go over - // limit. Otherwise put aside the marshalled - // message that caused limit to be exceeded and - // rollback stream to the marker. - try - { - _transceiver.checkSendSize(_batchStream, _instance.messageSizeMax()); - } - catch(Ice.LocalException ex) - { - if(_batchRequestNum == 0) - { - resetBatch(true); - throw ex; - } - - lastRequest = new byte[_batchStream.size() - _batchMarker]; - java.nio.ByteBuffer buffer = _batchStream.prepareRead(); - buffer.position(_batchMarker); - buffer.get(lastRequest); - _batchStream.resize(_batchMarker, false); - autoflush = true; - } - } - } - - if(!autoflush) - { + synchronized(_sendMutex) + { + if(_transceiver == null) + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } + // + // Throw memory limit exception if the first + // message added causes us to go over + // limit. Otherwise put aside the marshalled + // message that caused limit to be exceeded and + // rollback stream to the marker. + try + { + _transceiver.checkSendSize(_batchStream, _instance.messageSizeMax()); + } + catch(Ice.LocalException ex) + { + if(_batchRequestNum == 0) + { + resetBatch(true); + throw ex; + } + + lastRequest = new byte[_batchStream.size() - _batchMarker]; + java.nio.ByteBuffer buffer = _batchStream.prepareRead(); + buffer.position(_batchMarker); + buffer.get(lastRequest); + _batchStream.resize(_batchMarker, false); + autoflush = true; + } + } + } + + if(!autoflush) + { // // Increment the number of requests in the batch. // @@ -989,10 +989,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private void flushBatchRequestsInternal(boolean ignoreInUse) { - IceInternal.BasicStream stream = null; + IceInternal.BasicStream stream = null; - synchronized(this) - { + synchronized(this) + { if(!ignoreInUse) { while(_batchStreamInUse && _exception == null) @@ -1006,98 +1006,98 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } } - - if(_exception != null) - { - throw _exception; - } - - if(_batchStream.isEmpty()) - { - return; // Nothing to do. - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Fill in the message size. - // - _batchStream.pos(10); - _batchStream.writeInt(_batchStream.size()); - - // - // Fill in the number of requests in the batch. - // - _batchStream.writeInt(_batchRequestNum); - - stream = doCompress(_batchStream, _overrideCompress ? _overrideCompressValue : _batchRequestCompress); - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; - } - - // - // Prevent that new batch requests are added while we are - // flushing. - // - _batchStreamInUse = true; - } - - try - { - synchronized(_sendMutex) - { - if(_transceiver == null) // Has the transceiver already been closed? - { - assert(_exception != null); - throw _exception; // The exception is immutable at this point. - } - - // - // Send the batch request. - // - IceInternal.TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - _transceiver.write(stream, _endpoint.timeout()); - } - } - catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). - { - synchronized(this) - { - setState(StateClosed, ex.get()); - assert(_exception != null); - - // - // Since batch requests are all oneways (or datagrams), we - // must report the exception to the caller. - // - throw _exception; - } - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - assert(_exception != null); - - // - // Since batch requests are all oneways (or datagrams), we - // must report the exception to the caller. - // - throw _exception; - } - } - - synchronized(this) - { - // - // Reset the batch stream, and notify that flushing is over. - // + + if(_exception != null) + { + throw _exception; + } + + if(_batchStream.isEmpty()) + { + return; // Nothing to do. + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + // + // Fill in the message size. + // + _batchStream.pos(10); + _batchStream.writeInt(_batchStream.size()); + + // + // Fill in the number of requests in the batch. + // + _batchStream.writeInt(_batchRequestNum); + + stream = doCompress(_batchStream, _overrideCompress ? _overrideCompressValue : _batchRequestCompress); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + + // + // Prevent that new batch requests are added while we are + // flushing. + // + _batchStreamInUse = true; + } + + try + { + synchronized(_sendMutex) + { + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } + + // + // Send the batch request. + // + IceInternal.TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + _transceiver.write(stream, _endpoint.timeout()); + } + } + catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). + { + synchronized(this) + { + setState(StateClosed, ex.get()); + assert(_exception != null); + + // + // Since batch requests are all oneways (or datagrams), we + // must report the exception to the caller. + // + throw _exception; + } + } + catch(LocalException ex) + { + synchronized(this) + { + setState(StateClosed, ex); + assert(_exception != null); + + // + // Since batch requests are all oneways (or datagrams), we + // must report the exception to the caller. + // + throw _exception; + } + } + + synchronized(this) + { + // + // Reset the batch stream, and notify that flushing is over. + // resetBatch(!ignoreInUse); - } + } } private void @@ -1121,98 +1121,98 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void sendResponse(IceInternal.BasicStream os, byte compressFlag) { - IceInternal.BasicStream stream = null; - try - { - synchronized(_sendMutex) - { - if(_transceiver == null) // Has the transceiver already been closed? - { - assert(_exception != null); - throw _exception; // The exception is immutable at this point. - } - - stream = doCompress(os, compressFlag != 0); - - // - // Send the reply. - // - IceInternal.TraceUtil.traceReply("sending reply", os, _logger, _traceLevels); - _transceiver.write(stream, _endpoint.timeout()); - } - } - catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). - { - synchronized(this) - { - setState(StateClosed, ex.get()); - } - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - } - } - - synchronized(this) - { - assert(_state > StateNotValidated); - - try - { - if(--_dispatchCount == 0) - { - notifyAll(); - } - - if(_state == StateClosing && _dispatchCount == 0) - { - initiateShutdown(); - } - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; - } - } - catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). - { - setState(StateClosed, ex.get()); - } - catch(LocalException ex) - { - setState(StateClosed, ex); - } - } + IceInternal.BasicStream stream = null; + try + { + synchronized(_sendMutex) + { + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } + + stream = doCompress(os, compressFlag != 0); + + // + // Send the reply. + // + IceInternal.TraceUtil.traceReply("sending reply", os, _logger, _traceLevels); + _transceiver.write(stream, _endpoint.timeout()); + } + } + catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). + { + synchronized(this) + { + setState(StateClosed, ex.get()); + } + } + catch(LocalException ex) + { + synchronized(this) + { + setState(StateClosed, ex); + } + } + + synchronized(this) + { + assert(_state > StateNotValidated); + + try + { + if(--_dispatchCount == 0) + { + notifyAll(); + } + + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + } + catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). + { + setState(StateClosed, ex.get()); + } + catch(LocalException ex) + { + setState(StateClosed, ex); + } + } } public synchronized void sendNoResponse() { - assert(_state > StateNotValidated); - - try - { - if(--_dispatchCount == 0) - { - notifyAll(); - } - - if(_state == StateClosing && _dispatchCount == 0) - { - initiateShutdown(); - } - } - catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). - { - setState(StateClosed, ex.get()); - } - catch(LocalException ex) - { - setState(StateClosed, ex); - } + assert(_state > StateNotValidated); + + try + { + if(--_dispatchCount == 0) + { + notifyAll(); + } + + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); + } + } + catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write(). + { + setState(StateClosed, ex.get()); + } + catch(LocalException ex) + { + setState(StateClosed, ex); + } } public IceInternal.EndpointI @@ -1231,39 +1231,39 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public synchronized void setAdapter(ObjectAdapter adapter) { - if(_exception != null) - { - throw _exception; - } - - assert(_state < StateClosing); - - _adapter = adapter; - - if(_adapter != null) - { - _servantManager = ((ObjectAdapterI)_adapter).getServantManager(); - if(_servantManager == null) - { - _adapter = null; - } - } - else - { - _servantManager = null; - } - - // - // We never change the thread pool with which we were - // initially registered, even if we add or remove an object - // adapter. - // + if(_exception != null) + { + throw _exception; + } + + assert(_state < StateClosing); + + _adapter = adapter; + + if(_adapter != null) + { + _servantManager = ((ObjectAdapterI)_adapter).getServantManager(); + if(_servantManager == null) + { + _adapter = null; + } + } + else + { + _servantManager = null; + } + + // + // We never change the thread pool with which we were + // initially registered, even if we add or remove an object + // adapter. + // } public synchronized ObjectAdapter getAdapter() { - return _adapter; + return _adapter; } public synchronized ObjectPrx @@ -1276,7 +1276,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne ConnectionI[] connections = new ConnectionI[1]; connections[0] = this; IceInternal.Reference ref = - _instance.referenceFactory().create(ident, _instance.getDefaultContext(), "", + _instance.referenceFactory().create(ident, _instance.getDefaultContext(), "", IceInternal.Reference.ModeTwoway, connections); return _instance.proxyFactory().referenceToProxy(ref); } @@ -1288,205 +1288,205 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public boolean datagram() { - assert(!_threadPerConnection); // Only for use with a thread pool. - return _endpoint.datagram(); // No mutex protection necessary, _endpoint is immutable. + assert(!_threadPerConnection); // Only for use with a thread pool. + return _endpoint.datagram(); // No mutex protection necessary, _endpoint is immutable. } public boolean readable() { - assert(!_threadPerConnection); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. return true; } public boolean read(IceInternal.BasicStream stream) { - assert(!_threadPerConnection); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. - return _transceiver.read(stream, 0); + return _transceiver.read(stream, 0); - // - // Updating _acmAbsoluteTimeoutMillis is too expensive here, - // because we would have to acquire a lock just for this - // purpose. Instead, we update _acmAbsoluteTimeoutMillis in - // message(). - // + // + // Updating _acmAbsoluteTimeoutMillis is too expensive here, + // because we would have to acquire a lock just for this + // purpose. Instead, we update _acmAbsoluteTimeoutMillis in + // message(). + // } public void message(IceInternal.BasicStream stream, IceInternal.ThreadPool threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. - MessageInfo info = new MessageInfo(stream); + MessageInfo info = new MessageInfo(stream); synchronized(this) { - // - // We must promote within the synchronization, otherwise - // there could be various race conditions with close - // connection messages and other messages. - // - threadPool.promoteFollower(); + // + // We must promote within the synchronization, otherwise + // there could be various race conditions with close + // connection messages and other messages. + // + threadPool.promoteFollower(); if(_state != StateClosed) { - parseMessage(info); - } - - // - // parseMessage() can close the connection, so we must check - // for closed state again. - // - if(_state == StateClosed) - { - return; - } - } - - // - // Asynchronous replies must be handled outside the thread - // synchronization, so that nested calls are possible. - // - if(info.outAsync != null) - { - info.outAsync.__finished(info.stream); - } - - // - // Method invocation (or multiple invocations for batch messages) - // must be done outside the thread synchronization, so that nested - // calls are possible. - // - invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter); + parseMessage(info); + } + + // + // parseMessage() can close the connection, so we must check + // for closed state again. + // + if(_state == StateClosed) + { + return; + } + } + + // + // Asynchronous replies must be handled outside the thread + // synchronization, so that nested calls are possible. + // + if(info.outAsync != null) + { + info.outAsync.__finished(info.stream); + } + + // + // Method invocation (or multiple invocations for batch messages) + // must be done outside the thread synchronization, so that nested + // calls are possible. + // + invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter); } public void finished(IceInternal.ThreadPool threadPool) { - assert(!_threadPerConnection); // Only for use with a thread pool. - - threadPool.promoteFollower(); - - LocalException localEx = null; - - IceInternal.IntMap requests = null; - IceInternal.IntMap asyncRequests = null; - - synchronized(this) - { - --_finishedCount; - if(_finishedCount == 0 && _state == StateClosed) - { - // - // We must make sure that nobody is sending when we - // close the transceiver. - // - synchronized(_sendMutex) - { - try - { - _transceiver.close(); - } - catch(LocalException ex) - { - localEx = ex; - } - - _transceiver = null; - notifyAll(); - } - } - - if(_state == StateClosed || _state == StateClosing) - { - requests = _requests; - _requests = new IceInternal.IntMap(); - - asyncRequests = _asyncRequests; - _asyncRequests = new IceInternal.IntMap(); - } - } - - if(requests != null) - { - java.util.Iterator i = requests.entryIterator(); - while(i.hasNext()) - { - IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); - IceInternal.Outgoing out = (IceInternal.Outgoing)e.getValue(); - out.finished(_exception); // The exception is immutable at this point. - } - } - - if(asyncRequests != null) - { - java.util.Iterator i = asyncRequests.entryIterator(); - while(i.hasNext()) - { - IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); - IceInternal.OutgoingAsync out = (IceInternal.OutgoingAsync)e.getValue(); - out.__finished(_exception); // The exception is immutable at this point. - } - } - - if(localEx != null) - { - throw localEx; - } + assert(!_threadPerConnection); // Only for use with a thread pool. + + threadPool.promoteFollower(); + + LocalException localEx = null; + + IceInternal.IntMap requests = null; + IceInternal.IntMap asyncRequests = null; + + synchronized(this) + { + --_finishedCount; + if(_finishedCount == 0 && _state == StateClosed) + { + // + // We must make sure that nobody is sending when we + // close the transceiver. + // + synchronized(_sendMutex) + { + try + { + _transceiver.close(); + } + catch(LocalException ex) + { + localEx = ex; + } + + _transceiver = null; + notifyAll(); + } + } + + if(_state == StateClosed || _state == StateClosing) + { + requests = _requests; + _requests = new IceInternal.IntMap(); + + asyncRequests = _asyncRequests; + _asyncRequests = new IceInternal.IntMap(); + } + } + + if(requests != null) + { + java.util.Iterator i = requests.entryIterator(); + while(i.hasNext()) + { + IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); + IceInternal.Outgoing out = (IceInternal.Outgoing)e.getValue(); + out.finished(_exception); // The exception is immutable at this point. + } + } + + if(asyncRequests != null) + { + java.util.Iterator i = asyncRequests.entryIterator(); + while(i.hasNext()) + { + IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); + IceInternal.OutgoingAsync out = (IceInternal.OutgoingAsync)e.getValue(); + out.__finished(_exception); // The exception is immutable at this point. + } + } + + if(localEx != null) + { + throw localEx; + } } public synchronized void exception(LocalException ex) { - setState(StateClosed, ex); + setState(StateClosed, ex); } public synchronized void invokeException(LocalException ex, int invokeNum) { - // - // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't - // called in case of a fatal exception we decrement _dispatchCount here. - // - - setState(StateClosed, ex); - - if(invokeNum > 0) - { - assert(_dispatchCount > 0); - _dispatchCount -= invokeNum; - assert(_dispatchCount >= 0); - if(_dispatchCount == 0) - { - notifyAll(); - } - } + // + // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't + // called in case of a fatal exception we decrement _dispatchCount here. + // + + setState(StateClosed, ex); + + if(invokeNum > 0) + { + assert(_dispatchCount > 0); + _dispatchCount -= invokeNum; + assert(_dispatchCount >= 0); + if(_dispatchCount == 0) + { + notifyAll(); + } + } } public String type() { - return _type; // No mutex lock, _type is immutable. + return _type; // No mutex lock, _type is immutable. } public int timeout() { - return _endpoint.timeout(); // No mutex protection necessary, _endpoint is immutable. + return _endpoint.timeout(); // No mutex protection necessary, _endpoint is immutable. } public String toString() { - return _toString(); + return _toString(); } public String _toString() { - return _desc; // No mutex lock, _desc is immutable. + return _desc; // No mutex lock, _desc is immutable. } // @@ -1498,143 +1498,143 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public IceInternal.Transceiver getTransceiver() { - return _transceiver; + return _transceiver; } public ConnectionI(IceInternal.Instance instance, IceInternal.Transceiver transceiver, - IceInternal.EndpointI endpoint, ObjectAdapter adapter, boolean threadPerConnection) + IceInternal.EndpointI endpoint, ObjectAdapter adapter, boolean threadPerConnection) { super(instance); _threadPerConnection = threadPerConnection; _transceiver = transceiver; - _desc = transceiver.toString(); + _desc = transceiver.toString(); _type = transceiver.type(); _endpoint = endpoint; _adapter = adapter; _logger = instance.initializationData().logger; // Cached for better performance. _traceLevels = instance.traceLevels(); // Cached for better performance. - _registeredWithPool = false; - _finishedCount = 0; - _warn = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; - _cacheBuffers = _instance.initializationData().properties.getPropertyAsIntWithDefault( - "Ice.CacheMessageBuffers", 1) == 1; - _acmAbsoluteTimeoutMillis = 0; + _registeredWithPool = false; + _finishedCount = 0; + _warn = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; + _cacheBuffers = _instance.initializationData().properties.getPropertyAsIntWithDefault( + "Ice.CacheMessageBuffers", 1) == 1; + _acmAbsoluteTimeoutMillis = 0; _nextRequestId = 1; - _batchAutoFlush = _instance.initializationData().properties.getPropertyAsIntWithDefault( + _batchAutoFlush = _instance.initializationData().properties.getPropertyAsIntWithDefault( "Ice.BatchAutoFlush", 1) > 0 ? true : false; _batchStream = new IceInternal.BasicStream(instance, _batchAutoFlush); - _batchStreamInUse = false; - _batchRequestNum = 0; - _batchRequestCompress = false; + _batchStreamInUse = false; + _batchRequestNum = 0; + _batchRequestCompress = false; _dispatchCount = 0; _state = StateNotValidated; - _stateTime = System.currentTimeMillis(); - - if(_endpoint.datagram()) - { - _acmTimeout = 0; - } - else - { - if(_adapter != null) - { - _acmTimeout = _instance.serverACM(); - } - else - { - _acmTimeout = _instance.clientACM(); - } - } - - int compressionLevel = - _instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Compression.Level", 1); - if(compressionLevel < 1) - { - compressionLevel = 1; - } - else if(compressionLevel > 9) - { - compressionLevel = 9; - } - _compressionLevel = compressionLevel; - - if(_adapter != null) - { - _servantManager = ((ObjectAdapterI)_adapter).getServantManager(); - } - else - { - _servantManager = null; - } - - try - { - if(!threadPerConnection) - { - // - // Only set _threadPool if we really need it, i.e., if we are - // not in thread per connection mode. Thread pools have lazy - // initialization in Instance, and we don't want them to be - // created if they are not needed. - // - if(_adapter != null) - { - _threadPool = ((ObjectAdapterI)_adapter).getThreadPool(); - } - else - { - _threadPool = _instance.clientThreadPool(); - } - } - else - { - _threadPool = null; - - // - // If we are in thread per connection mode, create the thread - // for this connection. - // - _thread = new ThreadPerConnection(); - _thread.start(); - } - } - catch(java.lang.Exception ex) - { - if(threadPerConnection) - { - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - _logger.error("cannot create thread for connection:\n" + sw.toString()); - } - - try - { - _transceiver.close(); - } - catch(LocalException e) - { - // Here we ignore any exceptions in close(). - } - - Ice.SyscallException e = new Ice.SyscallException(); - e.initCause(ex); - throw e; - } - - _overrideCompress = _instance.defaultsAndOverrides().overrideCompress; - _overrideCompressValue = _instance.defaultsAndOverrides().overrideCompressValue; + _stateTime = System.currentTimeMillis(); + + if(_endpoint.datagram()) + { + _acmTimeout = 0; + } + else + { + if(_adapter != null) + { + _acmTimeout = _instance.serverACM(); + } + else + { + _acmTimeout = _instance.clientACM(); + } + } + + int compressionLevel = + _instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Compression.Level", 1); + if(compressionLevel < 1) + { + compressionLevel = 1; + } + else if(compressionLevel > 9) + { + compressionLevel = 9; + } + _compressionLevel = compressionLevel; + + if(_adapter != null) + { + _servantManager = ((ObjectAdapterI)_adapter).getServantManager(); + } + else + { + _servantManager = null; + } + + try + { + if(!threadPerConnection) + { + // + // Only set _threadPool if we really need it, i.e., if we are + // not in thread per connection mode. Thread pools have lazy + // initialization in Instance, and we don't want them to be + // created if they are not needed. + // + if(_adapter != null) + { + _threadPool = ((ObjectAdapterI)_adapter).getThreadPool(); + } + else + { + _threadPool = _instance.clientThreadPool(); + } + } + else + { + _threadPool = null; + + // + // If we are in thread per connection mode, create the thread + // for this connection. + // + _thread = new ThreadPerConnection(); + _thread.start(); + } + } + catch(java.lang.Exception ex) + { + if(threadPerConnection) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + _logger.error("cannot create thread for connection:\n" + sw.toString()); + } + + try + { + _transceiver.close(); + } + catch(LocalException e) + { + // Here we ignore any exceptions in close(). + } + + Ice.SyscallException e = new Ice.SyscallException(); + e.initCause(ex); + throw e; + } + + _overrideCompress = _instance.defaultsAndOverrides().overrideCompress; + _overrideCompressValue = _instance.defaultsAndOverrides().overrideCompressValue; } protected synchronized void finalize() throws Throwable { - IceUtil.Assert.FinalizerAssert(_state == StateClosed); - IceUtil.Assert.FinalizerAssert(_transceiver == null); - IceUtil.Assert.FinalizerAssert(_dispatchCount == 0); - IceUtil.Assert.FinalizerAssert(_thread == null); + IceUtil.Assert.FinalizerAssert(_state == StateClosed); + IceUtil.Assert.FinalizerAssert(_transceiver == null); + IceUtil.Assert.FinalizerAssert(_dispatchCount == 0); + IceUtil.Assert.FinalizerAssert(_thread == null); super.finalize(); } @@ -1648,11 +1648,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private void setState(int state, LocalException ex) { - // - // If setState() is called with an exception, then only closed - // and closing states are permissible. - // - assert(state == StateClosing || state == StateClosed); + // + // If setState() is called with an exception, then only closed + // and closing states are permissible. + // + assert(state == StateClosing || state == StateClosed); if(_state == state) // Don't switch twice. { @@ -1661,36 +1661,36 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_exception == null) { - _exception = ex; + _exception = ex; if(_warn) { - // - // We don't warn if we are not validated. - // - if(_state > StateNotValidated) - { - // - // Don't warn about certain expected exceptions. - // - if(!(_exception instanceof CloseConnectionException || - _exception instanceof ForcedCloseConnectionException || - _exception instanceof ConnectionTimeoutException || - _exception instanceof CommunicatorDestroyedException || - _exception instanceof ObjectAdapterDeactivatedException || - (_exception instanceof ConnectionLostException && _state == StateClosing))) - { - warning("connection exception", _exception); - } - } - } - } - - // - // We must set the new state before we notify requests of any - // exceptions. Otherwise new requests may retry on a - // connection that is not yet marked as closed or closing. - // + // + // We don't warn if we are not validated. + // + if(_state > StateNotValidated) + { + // + // Don't warn about certain expected exceptions. + // + if(!(_exception instanceof CloseConnectionException || + _exception instanceof ForcedCloseConnectionException || + _exception instanceof ConnectionTimeoutException || + _exception instanceof CommunicatorDestroyedException || + _exception instanceof ObjectAdapterDeactivatedException || + (_exception instanceof ConnectionLostException && _state == StateClosing))) + { + warning("connection exception", _exception); + } + } + } + } + + // + // We must set the new state before we notify requests of any + // exceptions. Otherwise new requests may retry on a + // connection that is not yet marked as closed or closing. + // setState(state); } @@ -1706,13 +1706,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne state = StateClosed; } - // - // Skip graceful shutdown if we are destroyed before validation. - // - if(_state == StateNotValidated && state == StateClosing) - { - state = StateClosed; - } + // + // Skip graceful shutdown if we are destroyed before validation. + // + if(_state == StateNotValidated && state == StateClosing) + { + state = StateClosed; + } if(_state == state) // Don't switch twice. { @@ -1721,147 +1721,147 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne switch(state) { - case StateNotValidated: - { - assert(false); - break; - } + case StateNotValidated: + { + assert(false); + break; + } case StateActive: { - // - // Can only switch from holding or not validated to - // active. - // + // + // Can only switch from holding or not validated to + // active. + // if(_state != StateHolding && _state != StateNotValidated) { return; } - if(!_threadPerConnection) - { - registerWithPool(); - } + if(!_threadPerConnection) + { + registerWithPool(); + } break; } case StateHolding: { - // - // Can only switch from active or not validated to - // holding. - // - if(_state != StateActive && _state != StateNotValidated) - { + // + // Can only switch from active or not validated to + // holding. + // + if(_state != StateActive && _state != StateNotValidated) + { return; } - if(!_threadPerConnection) - { - unregisterWithPool(); - } + if(!_threadPerConnection) + { + unregisterWithPool(); + } break; } case StateClosing: { - // - // Can't change back from closed. - // + // + // Can't change back from closed. + // if(_state == StateClosed) { return; } - if(!_threadPerConnection) - { - registerWithPool(); // We need to continue to read in closing state. - } + if(!_threadPerConnection) + { + registerWithPool(); // We need to continue to read in closing state. + } break; } - + case StateClosed: { - if(_threadPerConnection) - { - // - // If we are in thread per connection mode, we - // shutdown both for reading and writing. This will - // unblock and read call with an exception. The thread - // per connection then closes the transceiver. - // - _transceiver.shutdownReadWrite(); - } - else if(_state == StateNotValidated) - { - // - // If we change from not validated, we can close right - // away. - // - assert(!_registeredWithPool); - - // - // We must make sure that nobody is sending when we - // close the transceiver. - // - synchronized(_sendMutex) - { - try - { - _transceiver.close(); - } - catch(LocalException ex) - { - // Here we ignore any exceptions in close(). - } - - _transceiver = null; - //notifyAll(); // We notify already below. - } - } - else - { - // - // Otherwise we first must make sure that we are - // registered, then we unregister, and let finished() - // do the close. - // - registerWithPool(); - unregisterWithPool(); - - // - // We must prevent any further writes when _state == StateClosed. - // However, functions such as sendResponse cannot acquire the main - // mutex in order to check _state. Therefore we shut down the write - // end of the transceiver, which causes subsequent write attempts - // to fail with an exception. - // - _transceiver.shutdownWrite(); - } - break; - } - } - - // - // We only register with the connection monitor if our new state - // is StateActive. Otherwise we unregister with the connection - // monitor, but only if we were registered before, i.e., if our - // old state was StateActive. - // - IceInternal.ConnectionMonitor connectionMonitor = _instance.connectionMonitor(); - if(connectionMonitor != null) - { - if(state == StateActive) - { - connectionMonitor.add(this); - } - else if(_state == StateActive) - { - connectionMonitor.remove(this); - } - } + if(_threadPerConnection) + { + // + // If we are in thread per connection mode, we + // shutdown both for reading and writing. This will + // unblock and read call with an exception. The thread + // per connection then closes the transceiver. + // + _transceiver.shutdownReadWrite(); + } + else if(_state == StateNotValidated) + { + // + // If we change from not validated, we can close right + // away. + // + assert(!_registeredWithPool); + + // + // We must make sure that nobody is sending when we + // close the transceiver. + // + synchronized(_sendMutex) + { + try + { + _transceiver.close(); + } + catch(LocalException ex) + { + // Here we ignore any exceptions in close(). + } + + _transceiver = null; + //notifyAll(); // We notify already below. + } + } + else + { + // + // Otherwise we first must make sure that we are + // registered, then we unregister, and let finished() + // do the close. + // + registerWithPool(); + unregisterWithPool(); + + // + // We must prevent any further writes when _state == StateClosed. + // However, functions such as sendResponse cannot acquire the main + // mutex in order to check _state. Therefore we shut down the write + // end of the transceiver, which causes subsequent write attempts + // to fail with an exception. + // + _transceiver.shutdownWrite(); + } + break; + } + } + + // + // We only register with the connection monitor if our new state + // is StateActive. Otherwise we unregister with the connection + // monitor, but only if we were registered before, i.e., if our + // old state was StateActive. + // + IceInternal.ConnectionMonitor connectionMonitor = _instance.connectionMonitor(); + if(connectionMonitor != null) + { + if(state == StateActive) + { + connectionMonitor.add(this); + } + else if(_state == StateActive) + { + connectionMonitor.remove(this); + } + } _state = state; - _stateTime = System.currentTimeMillis(); + _stateTime = System.currentTimeMillis(); - notifyAll(); + notifyAll(); if(_state == StateClosing && _dispatchCount == 0) { @@ -1882,665 +1882,665 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private void initiateShutdown() - throws IceInternal.LocalExceptionWrapper // Java-specific workaround in Transceiver.write(). + throws IceInternal.LocalExceptionWrapper // Java-specific workaround in Transceiver.write(). { - assert(_state == StateClosing); - assert(_dispatchCount == 0); - - if(!_endpoint.datagram()) - { - synchronized(_sendMutex) - { - // - // Before we shut down, we send a close connection - // message. - // - IceInternal.BasicStream os = new IceInternal.BasicStream(_instance); - os.writeBlob(IceInternal.Protocol.magic); - os.writeByte(IceInternal.Protocol.protocolMajor); - os.writeByte(IceInternal.Protocol.protocolMinor); - os.writeByte(IceInternal.Protocol.encodingMajor); - os.writeByte(IceInternal.Protocol.encodingMinor); - os.writeByte(IceInternal.Protocol.closeConnectionMsg); - os.writeByte(_compressionSupported ? (byte)1 : (byte)0); - os.writeInt(IceInternal.Protocol.headerSize); // Message size. - - // - // Send the message. - // - IceInternal.TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - // - // The CloseConnection message should be sufficient. Closing the write - // end of the socket is probably an artifact of how things were done - // in IIOP. In fact, shutting down the write end of the socket causes - // problems on Windows by preventing the peer from using the socket. - // For example, the peer is no longer able to continue writing a large - // message after the socket is shutdown. - // - //_transceiver.shutdownWrite(); - } - } + assert(_state == StateClosing); + assert(_dispatchCount == 0); + + if(!_endpoint.datagram()) + { + synchronized(_sendMutex) + { + // + // Before we shut down, we send a close connection + // message. + // + IceInternal.BasicStream os = new IceInternal.BasicStream(_instance); + os.writeBlob(IceInternal.Protocol.magic); + os.writeByte(IceInternal.Protocol.protocolMajor); + os.writeByte(IceInternal.Protocol.protocolMinor); + os.writeByte(IceInternal.Protocol.encodingMajor); + os.writeByte(IceInternal.Protocol.encodingMinor); + os.writeByte(IceInternal.Protocol.closeConnectionMsg); + os.writeByte(_compressionSupported ? (byte)1 : (byte)0); + os.writeInt(IceInternal.Protocol.headerSize); // Message size. + + // + // Send the message. + // + IceInternal.TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + // + // The CloseConnection message should be sufficient. Closing the write + // end of the socket is probably an artifact of how things were done + // in IIOP. In fact, shutting down the write end of the socket causes + // problems on Windows by preventing the peer from using the socket. + // For example, the peer is no longer able to continue writing a large + // message after the socket is shutdown. + // + //_transceiver.shutdownWrite(); + } + } } private void registerWithPool() { - assert(!_threadPerConnection); // Only for use with a thread pool. + assert(!_threadPerConnection); // Only for use with a thread pool. - if(!_registeredWithPool) - { - _threadPool._register(_transceiver.fd(), this); - _registeredWithPool = true; - } + if(!_registeredWithPool) + { + _threadPool._register(_transceiver.fd(), this); + _registeredWithPool = true; + } } private void unregisterWithPool() { - assert(!_threadPerConnection); // Only for use with a thread pool. - - if(_registeredWithPool) - { - _threadPool.unregister(_transceiver.fd()); - _registeredWithPool = false; - ++_finishedCount; // For each unregistration, finished() is called once. - } + assert(!_threadPerConnection); // Only for use with a thread pool. + + if(_registeredWithPool) + { + _threadPool.unregister(_transceiver.fd()); + _registeredWithPool = false; + ++_finishedCount; // For each unregistration, finished() is called once. + } } private IceInternal.BasicStream doCompress(IceInternal.BasicStream uncompressed, boolean compress) { - if(_compressionSupported) - { - if(compress && uncompressed.size() >= 100) - { - // - // Do compression. - // - IceInternal.BasicStream cstream = uncompressed.compress(IceInternal.Protocol.headerSize, - _compressionLevel); - if(cstream != null) - { - // - // Set compression status. - // - cstream.pos(9); - cstream.writeByte((byte)2); - - // - // Write the size of the compressed stream into the header. - // - cstream.pos(10); - cstream.writeInt(cstream.size()); - - // - // Write the compression status and size of the compressed stream into the header of the - // uncompressed stream -- we need this to trace requests correctly. - // - uncompressed.pos(9); - uncompressed.writeByte((byte)2); - uncompressed.writeInt(cstream.size()); - - return cstream; - } - } - } - - uncompressed.pos(9); - uncompressed.writeByte((byte)((_compressionSupported && compress) ? 1 : 0)); - - // - // Not compressed, fill in the message size. - // - uncompressed.pos(10); - uncompressed.writeInt(uncompressed.size()); - - return uncompressed; + if(_compressionSupported) + { + if(compress && uncompressed.size() >= 100) + { + // + // Do compression. + // + IceInternal.BasicStream cstream = uncompressed.compress(IceInternal.Protocol.headerSize, + _compressionLevel); + if(cstream != null) + { + // + // Set compression status. + // + cstream.pos(9); + cstream.writeByte((byte)2); + + // + // Write the size of the compressed stream into the header. + // + cstream.pos(10); + cstream.writeInt(cstream.size()); + + // + // Write the compression status and size of the compressed stream into the header of the + // uncompressed stream -- we need this to trace requests correctly. + // + uncompressed.pos(9); + uncompressed.writeByte((byte)2); + uncompressed.writeInt(cstream.size()); + + return cstream; + } + } + } + + uncompressed.pos(9); + uncompressed.writeByte((byte)((_compressionSupported && compress) ? 1 : 0)); + + // + // Not compressed, fill in the message size. + // + uncompressed.pos(10); + uncompressed.writeInt(uncompressed.size()); + + return uncompressed; } private static class MessageInfo { - MessageInfo(IceInternal.BasicStream stream) - { - this.stream = stream; - } - - IceInternal.BasicStream stream; - int invokeNum; - int requestId; - byte compress; - IceInternal.ServantManager servantManager; - ObjectAdapter adapter; - IceInternal.OutgoingAsync outAsync; + MessageInfo(IceInternal.BasicStream stream) + { + this.stream = stream; + } + + IceInternal.BasicStream stream; + int invokeNum; + int requestId; + byte compress; + IceInternal.ServantManager servantManager; + ObjectAdapter adapter; + IceInternal.OutgoingAsync outAsync; } private void parseMessage(MessageInfo info) { - assert(_state > StateNotValidated && _state < StateClosed); - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; - } - - try - { - // - // We don't need to check magic and version here. This has - // already been done by the ThreadPool or ThreadPerConnection, - // which provides us with the stream. - // - assert(info.stream.pos() == info.stream.size()); - info.stream.pos(8); - byte messageType = info.stream.readByte(); - info.compress = info.stream.readByte(); - if(info.compress == (byte)2) - { - if(_compressionSupported) - { - IceInternal.BasicStream ustream = info.stream.uncompress(IceInternal.Protocol.headerSize); - if(ustream != info.stream) - { - info.stream = ustream; - } - } - else - { - FeatureNotSupportedException ex = new FeatureNotSupportedException(); - ex.unsupportedFeature = "Cannot uncompress compressed message: " - + "org.apache.tools.bzip2.CBZip2OutputStream was not found"; - throw ex; - } - } - info.stream.pos(IceInternal.Protocol.headerSize); - - switch(messageType) - { - case IceInternal.Protocol.closeConnectionMsg: - { - IceInternal.TraceUtil.traceHeader("received close connection", info.stream, _logger, _traceLevels); - if(_endpoint.datagram()) - { - if(_warn) - { - _logger.warning("ignoring close connection message for datagram connection:\n" + _desc); - } - } - else - { - setState(StateClosed, new CloseConnectionException()); - } - break; - } - - case IceInternal.Protocol.requestMsg: - { - if(_state == StateClosing) - { - IceInternal.TraceUtil.traceRequest("received request during closing\n" + - "(ignored by server, client will retry)", - info.stream, _logger, _traceLevels); - } - else - { - IceInternal.TraceUtil.traceRequest("received request", info.stream, _logger, _traceLevels); - info.requestId = info.stream.readInt(); - info.invokeNum = 1; - info.servantManager = _servantManager; - info.adapter = _adapter; - ++_dispatchCount; - } - break; - } - - case IceInternal.Protocol.requestBatchMsg: - { - if(_state == StateClosing) - { - IceInternal.TraceUtil.traceBatchRequest("received batch request during closing\n" + - "(ignored by server, client will retry)", - info.stream, _logger, _traceLevels); - } - else - { - IceInternal.TraceUtil.traceBatchRequest("received batch request", info.stream, _logger, - _traceLevels); - info.invokeNum = info.stream.readInt(); - if(info.invokeNum < 0) - { - info.invokeNum = 0; - throw new NegativeSizeException(); - } - info.servantManager = _servantManager; - info.adapter = _adapter; - _dispatchCount += info.invokeNum; - } - break; - } - - case IceInternal.Protocol.replyMsg: - { - IceInternal.TraceUtil.traceReply("received reply", info.stream, _logger, _traceLevels); - info.requestId = info.stream.readInt(); - IceInternal.Outgoing out = (IceInternal.Outgoing)_requests.remove(info.requestId); - if(out != null) - { - out.finished(info.stream); - } - else - { - info.outAsync = (IceInternal.OutgoingAsync)_asyncRequests.remove(info.requestId); - if(info.outAsync == null) - { - throw new UnknownRequestIdException(); - } - } - break; - } - - case IceInternal.Protocol.validateConnectionMsg: - { - IceInternal.TraceUtil.traceHeader("received validate connection", info.stream, _logger, - _traceLevels); - if(_warn) - { - _logger.warning("ignoring unexpected validate connection message:\n" + _desc); - } - break; - } - - default: - { - IceInternal.TraceUtil.traceHeader("received unknown message\n" + - "(invalid, closing connection)", info.stream, _logger, - _traceLevels); - throw new UnknownMessageException(); - } - } - } - catch(SocketException ex) - { - setState(StateClosed, ex); - } - catch(LocalException ex) - { - if(_endpoint.datagram()) - { - if(_warn) - { - _logger.warning("udp connection exception:\n" + ex + _desc); - } - } - else - { - setState(StateClosed, ex); - } - } + assert(_state > StateNotValidated && _state < StateClosed); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + + try + { + // + // We don't need to check magic and version here. This has + // already been done by the ThreadPool or ThreadPerConnection, + // which provides us with the stream. + // + assert(info.stream.pos() == info.stream.size()); + info.stream.pos(8); + byte messageType = info.stream.readByte(); + info.compress = info.stream.readByte(); + if(info.compress == (byte)2) + { + if(_compressionSupported) + { + IceInternal.BasicStream ustream = info.stream.uncompress(IceInternal.Protocol.headerSize); + if(ustream != info.stream) + { + info.stream = ustream; + } + } + else + { + FeatureNotSupportedException ex = new FeatureNotSupportedException(); + ex.unsupportedFeature = "Cannot uncompress compressed message: " + + "org.apache.tools.bzip2.CBZip2OutputStream was not found"; + throw ex; + } + } + info.stream.pos(IceInternal.Protocol.headerSize); + + switch(messageType) + { + case IceInternal.Protocol.closeConnectionMsg: + { + IceInternal.TraceUtil.traceHeader("received close connection", info.stream, _logger, _traceLevels); + if(_endpoint.datagram()) + { + if(_warn) + { + _logger.warning("ignoring close connection message for datagram connection:\n" + _desc); + } + } + else + { + setState(StateClosed, new CloseConnectionException()); + } + break; + } + + case IceInternal.Protocol.requestMsg: + { + if(_state == StateClosing) + { + IceInternal.TraceUtil.traceRequest("received request during closing\n" + + "(ignored by server, client will retry)", + info.stream, _logger, _traceLevels); + } + else + { + IceInternal.TraceUtil.traceRequest("received request", info.stream, _logger, _traceLevels); + info.requestId = info.stream.readInt(); + info.invokeNum = 1; + info.servantManager = _servantManager; + info.adapter = _adapter; + ++_dispatchCount; + } + break; + } + + case IceInternal.Protocol.requestBatchMsg: + { + if(_state == StateClosing) + { + IceInternal.TraceUtil.traceBatchRequest("received batch request during closing\n" + + "(ignored by server, client will retry)", + info.stream, _logger, _traceLevels); + } + else + { + IceInternal.TraceUtil.traceBatchRequest("received batch request", info.stream, _logger, + _traceLevels); + info.invokeNum = info.stream.readInt(); + if(info.invokeNum < 0) + { + info.invokeNum = 0; + throw new NegativeSizeException(); + } + info.servantManager = _servantManager; + info.adapter = _adapter; + _dispatchCount += info.invokeNum; + } + break; + } + + case IceInternal.Protocol.replyMsg: + { + IceInternal.TraceUtil.traceReply("received reply", info.stream, _logger, _traceLevels); + info.requestId = info.stream.readInt(); + IceInternal.Outgoing out = (IceInternal.Outgoing)_requests.remove(info.requestId); + if(out != null) + { + out.finished(info.stream); + } + else + { + info.outAsync = (IceInternal.OutgoingAsync)_asyncRequests.remove(info.requestId); + if(info.outAsync == null) + { + throw new UnknownRequestIdException(); + } + } + break; + } + + case IceInternal.Protocol.validateConnectionMsg: + { + IceInternal.TraceUtil.traceHeader("received validate connection", info.stream, _logger, + _traceLevels); + if(_warn) + { + _logger.warning("ignoring unexpected validate connection message:\n" + _desc); + } + break; + } + + default: + { + IceInternal.TraceUtil.traceHeader("received unknown message\n" + + "(invalid, closing connection)", info.stream, _logger, + _traceLevels); + throw new UnknownMessageException(); + } + } + } + catch(SocketException ex) + { + setState(StateClosed, ex); + } + catch(LocalException ex) + { + if(_endpoint.datagram()) + { + if(_warn) + { + _logger.warning("udp connection exception:\n" + ex + _desc); + } + } + else + { + setState(StateClosed, ex); + } + } } private void invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress, - IceInternal.ServantManager servantManager, ObjectAdapter adapter) + IceInternal.ServantManager servantManager, ObjectAdapter adapter) { - // - // Note: In contrast to other private or protected methods, this - // operation must be called *without* the mutex locked. - // - - IceInternal.Incoming in = null; - try - { - while(invokeNum > 0) - { - - // - // Prepare the invocation. - // - boolean response = !_endpoint.datagram() && requestId != 0; - in = getIncoming(adapter, response, compress, requestId); - IceInternal.BasicStream is = in.is(); - stream.swap(is); - IceInternal.BasicStream os = in.os(); - - // - // Prepare the response if necessary. - // - if(response) - { - assert(invokeNum == 1); // No further invocations if a response is expected. - os.writeBlob(IceInternal.Protocol.replyHdr); - - // - // Add the request ID. - // - os.writeInt(requestId); - } - - in.invoke(servantManager); - - // - // If there are more invocations, we need the stream back. - // - if(--invokeNum > 0) - { - stream.swap(is); - } - - reclaimIncoming(in); - in = null; - } - } - catch(LocalException ex) - { - invokeException(ex, invokeNum); - } - catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace. - { - UnknownException uex = new UnknownException(); - java.io.StringWriter sw = new java.io.StringWriter(); - java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - uex.unknown = sw.toString(); - _logger.error(uex.unknown); - invokeException(uex, invokeNum); - } - finally - { - if(in != null) - { - reclaimIncoming(in); - } - } + // + // Note: In contrast to other private or protected methods, this + // operation must be called *without* the mutex locked. + // + + IceInternal.Incoming in = null; + try + { + while(invokeNum > 0) + { + + // + // Prepare the invocation. + // + boolean response = !_endpoint.datagram() && requestId != 0; + in = getIncoming(adapter, response, compress, requestId); + IceInternal.BasicStream is = in.is(); + stream.swap(is); + IceInternal.BasicStream os = in.os(); + + // + // Prepare the response if necessary. + // + if(response) + { + assert(invokeNum == 1); // No further invocations if a response is expected. + os.writeBlob(IceInternal.Protocol.replyHdr); + + // + // Add the request ID. + // + os.writeInt(requestId); + } + + in.invoke(servantManager); + + // + // If there are more invocations, we need the stream back. + // + if(--invokeNum > 0) + { + stream.swap(is); + } + + reclaimIncoming(in); + in = null; + } + } + catch(LocalException ex) + { + invokeException(ex, invokeNum); + } + catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace. + { + UnknownException uex = new UnknownException(); + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + uex.unknown = sw.toString(); + _logger.error(uex.unknown); + invokeException(uex, invokeNum); + } + finally + { + if(in != null) + { + reclaimIncoming(in); + } + } } private void run() { - // - // For non-datagram connections, the thread-per-connection - // must validate and activate this connection, and not in the - // connection factory. Please see the comments in the - // connection factory for details. - // - if(!_endpoint.datagram()) - { - try - { - validate(); - } - catch(LocalException ex) - { - synchronized(this) - { - assert(_state == StateClosed); - - // - // We must make sure that nobody is sending when - // we close the transceiver. - // - synchronized(_sendMutex) - { - if(_transceiver != null) - { - try - { - _transceiver.close(); - } - catch(LocalException e) - { - // Here we ignore any exceptions in close(). - } - - _transceiver = null; - } - notifyAll(); - } - } - return; - } - - activate(); - } - - boolean warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; - - boolean closed = false; - - IceInternal.BasicStream stream = new IceInternal.BasicStream(_instance); - - while(!closed) - { - // - // We must accept new connections outside the thread - // synchronization, because we use blocking accept. - // - - try - { - try - { - stream.resize(IceInternal.Protocol.headerSize, true); - stream.pos(0); - _transceiver.read(stream, -1); - - int pos = stream.pos(); - if(pos < IceInternal.Protocol.headerSize) - { - // - // This situation is possible for small UDP packets. - // - throw new IllegalMessageSizeException(); - } - stream.pos(0); - byte[] m = stream.readBlob(4); - if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || - m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) - { - BadMagicException ex = new BadMagicException(); - ex.badMagic = m; - throw ex; - } - byte pMajor = stream.readByte(); - byte pMinor = stream.readByte(); - if(pMajor != IceInternal.Protocol.protocolMajor) - { - UnsupportedProtocolException e = new UnsupportedProtocolException(); - e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; - e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; - e.major = IceInternal.Protocol.protocolMajor; - e.minor = IceInternal.Protocol.protocolMinor; - throw e; - } - byte eMajor = stream.readByte(); - byte eMinor = stream.readByte(); - if(eMajor != IceInternal.Protocol.encodingMajor) - { - UnsupportedEncodingException e = new UnsupportedEncodingException(); - e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; - e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; - e.major = IceInternal.Protocol.encodingMajor; - e.minor = IceInternal.Protocol.encodingMinor; - throw e; - } - byte messageType = stream.readByte(); - byte compress = stream.readByte(); - int size = stream.readInt(); - if(size < IceInternal.Protocol.headerSize) - { - throw new IllegalMessageSizeException(); - } - if(size > _instance.messageSizeMax()) - { - throw new MemoryLimitException(); - } - if(size > stream.size()) - { - stream.resize(size, true); - } - stream.pos(pos); - - if(pos != stream.size()) - { - if(_endpoint.datagram()) - { - if(warnUdp) - { - _logger.warning("DatagramLimitException: maximum size of " + pos + " exceeded"); - } - throw new DatagramLimitException(); - } - else - { - _transceiver.read(stream, -1); - assert(stream.pos() == stream.size()); - } - } - } - catch(DatagramLimitException ex) // Expected. - { - continue; - } - catch(SocketException ex) - { - exception(ex); - } - catch(LocalException ex) - { - if(_endpoint.datagram()) - { - if(_warn) - { - _logger.warning("datagram connection exception:\n" + ex + "\n" + _desc); - } - continue; - } - else - { - exception(ex); - } - } - - MessageInfo info = new MessageInfo(stream); - - LocalException localEx = null; - - IceInternal.IntMap requests = null; - IceInternal.IntMap asyncRequests = null; - - synchronized(this) - { - while(_state == StateHolding) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - if(_state != StateClosed) - { - parseMessage(info); - } - - // - // parseMessage() can close the connection, so we must - // check for closed state again. - // - if(_state == StateClosed) - { - // - // We must make sure that nobody is sending when we close - // the transceiver. - // - synchronized(_sendMutex) - { - try - { - _transceiver.close(); - } - catch(LocalException ex) - { - localEx = ex; - } - - _transceiver = null; - notifyAll(); - } - - // - // We cannot simply return here. We have to make sure - // that all requests (regular and async) are notified - // about the closed connection below. - // - closed = true; - } - - if(_state == StateClosed || _state == StateClosing) - { - requests = _requests; - _requests = new IceInternal.IntMap(); - - asyncRequests = _asyncRequests; - _asyncRequests = new IceInternal.IntMap(); - } - } - - // - // Asynchronous replies must be handled outside the thread - // synchronization, so that nested calls are possible. - // - if(info.outAsync != null) - { - info.outAsync.__finished(info.stream); - } - - // - // Method invocation (or multiple invocations for batch messages) - // must be done outside the thread synchronization, so that nested - // calls are possible. - // - invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, - info.adapter); - - if(requests != null) - { - java.util.Iterator i = requests.entryIterator(); - while(i.hasNext()) - { - IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); - IceInternal.Outgoing out = (IceInternal.Outgoing)e.getValue(); - out.finished(_exception); // The exception is immutable at this point. - } - } - - if(asyncRequests != null) - { - java.util.Iterator i = asyncRequests.entryIterator(); - while(i.hasNext()) - { - IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); - IceInternal.OutgoingAsync out = (IceInternal.OutgoingAsync)e.getValue(); - out.__finished(_exception); // The exception is immutable at this point. - } - } - - if(localEx != null) - { - assert(closed); - throw localEx; - } - } - finally - { - stream.reset(); - } - } + // + // For non-datagram connections, the thread-per-connection + // must validate and activate this connection, and not in the + // connection factory. Please see the comments in the + // connection factory for details. + // + if(!_endpoint.datagram()) + { + try + { + validate(); + } + catch(LocalException ex) + { + synchronized(this) + { + assert(_state == StateClosed); + + // + // We must make sure that nobody is sending when + // we close the transceiver. + // + synchronized(_sendMutex) + { + if(_transceiver != null) + { + try + { + _transceiver.close(); + } + catch(LocalException e) + { + // Here we ignore any exceptions in close(). + } + + _transceiver = null; + } + notifyAll(); + } + } + return; + } + + activate(); + } + + boolean warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; + + boolean closed = false; + + IceInternal.BasicStream stream = new IceInternal.BasicStream(_instance); + + while(!closed) + { + // + // We must accept new connections outside the thread + // synchronization, because we use blocking accept. + // + + try + { + try + { + stream.resize(IceInternal.Protocol.headerSize, true); + stream.pos(0); + _transceiver.read(stream, -1); + + int pos = stream.pos(); + if(pos < IceInternal.Protocol.headerSize) + { + // + // This situation is possible for small UDP packets. + // + throw new IllegalMessageSizeException(); + } + stream.pos(0); + byte[] m = stream.readBlob(4); + if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || + m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) + { + BadMagicException ex = new BadMagicException(); + ex.badMagic = m; + throw ex; + } + byte pMajor = stream.readByte(); + byte pMinor = stream.readByte(); + if(pMajor != IceInternal.Protocol.protocolMajor) + { + UnsupportedProtocolException e = new UnsupportedProtocolException(); + e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; + e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; + e.major = IceInternal.Protocol.protocolMajor; + e.minor = IceInternal.Protocol.protocolMinor; + throw e; + } + byte eMajor = stream.readByte(); + byte eMinor = stream.readByte(); + if(eMajor != IceInternal.Protocol.encodingMajor) + { + UnsupportedEncodingException e = new UnsupportedEncodingException(); + e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; + e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; + e.major = IceInternal.Protocol.encodingMajor; + e.minor = IceInternal.Protocol.encodingMinor; + throw e; + } + byte messageType = stream.readByte(); + byte compress = stream.readByte(); + int size = stream.readInt(); + if(size < IceInternal.Protocol.headerSize) + { + throw new IllegalMessageSizeException(); + } + if(size > _instance.messageSizeMax()) + { + throw new MemoryLimitException(); + } + if(size > stream.size()) + { + stream.resize(size, true); + } + stream.pos(pos); + + if(pos != stream.size()) + { + if(_endpoint.datagram()) + { + if(warnUdp) + { + _logger.warning("DatagramLimitException: maximum size of " + pos + " exceeded"); + } + throw new DatagramLimitException(); + } + else + { + _transceiver.read(stream, -1); + assert(stream.pos() == stream.size()); + } + } + } + catch(DatagramLimitException ex) // Expected. + { + continue; + } + catch(SocketException ex) + { + exception(ex); + } + catch(LocalException ex) + { + if(_endpoint.datagram()) + { + if(_warn) + { + _logger.warning("datagram connection exception:\n" + ex + "\n" + _desc); + } + continue; + } + else + { + exception(ex); + } + } + + MessageInfo info = new MessageInfo(stream); + + LocalException localEx = null; + + IceInternal.IntMap requests = null; + IceInternal.IntMap asyncRequests = null; + + synchronized(this) + { + while(_state == StateHolding) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + if(_state != StateClosed) + { + parseMessage(info); + } + + // + // parseMessage() can close the connection, so we must + // check for closed state again. + // + if(_state == StateClosed) + { + // + // We must make sure that nobody is sending when we close + // the transceiver. + // + synchronized(_sendMutex) + { + try + { + _transceiver.close(); + } + catch(LocalException ex) + { + localEx = ex; + } + + _transceiver = null; + notifyAll(); + } + + // + // We cannot simply return here. We have to make sure + // that all requests (regular and async) are notified + // about the closed connection below. + // + closed = true; + } + + if(_state == StateClosed || _state == StateClosing) + { + requests = _requests; + _requests = new IceInternal.IntMap(); + + asyncRequests = _asyncRequests; + _asyncRequests = new IceInternal.IntMap(); + } + } + + // + // Asynchronous replies must be handled outside the thread + // synchronization, so that nested calls are possible. + // + if(info.outAsync != null) + { + info.outAsync.__finished(info.stream); + } + + // + // Method invocation (or multiple invocations for batch messages) + // must be done outside the thread synchronization, so that nested + // calls are possible. + // + invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, + info.adapter); + + if(requests != null) + { + java.util.Iterator i = requests.entryIterator(); + while(i.hasNext()) + { + IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); + IceInternal.Outgoing out = (IceInternal.Outgoing)e.getValue(); + out.finished(_exception); // The exception is immutable at this point. + } + } + + if(asyncRequests != null) + { + java.util.Iterator i = asyncRequests.entryIterator(); + while(i.hasNext()) + { + IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.next(); + IceInternal.OutgoingAsync out = (IceInternal.OutgoingAsync)e.getValue(); + out.__finished(_exception); // The exception is immutable at this point. + } + } + + if(localEx != null) + { + assert(closed); + throw localEx; + } + } + finally + { + stream.reset(); + } + } } private void @@ -2548,7 +2548,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); + ex.printStackTrace(pw); pw.flush(); String s = msg + ":\n" + _desc + "\n" + sw.toString(); _logger.warning(s); @@ -2559,7 +2559,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); - ex.printStackTrace(pw); + ex.printStackTrace(pw); pw.flush(); String s = msg + ":\n" + _desc + "\n" + sw.toString(); _logger.error(s); @@ -2570,27 +2570,27 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { IceInternal.Incoming in = null; - if(_cacheBuffers) - { - synchronized(_incomingCacheMutex) - { - if(_incomingCache == null) - { - in = new IceInternal.Incoming(_instance, this, adapter, response, compress, requestId); - } - else - { - in = _incomingCache; - _incomingCache = _incomingCache.next; - in.reset(_instance, this, adapter, response, compress, requestId); - in.next = null; - } - } - } - else - { - in = new IceInternal.Incoming(_instance, this, adapter, response, compress, requestId); - } + if(_cacheBuffers) + { + synchronized(_incomingCacheMutex) + { + if(_incomingCache == null) + { + in = new IceInternal.Incoming(_instance, this, adapter, response, compress, requestId); + } + else + { + in = _incomingCache; + _incomingCache = _incomingCache.next; + in.reset(_instance, this, adapter, response, compress, requestId); + in.next = null; + } + } + } + else + { + in = new IceInternal.Incoming(_instance, this, adapter, response, compress, requestId); + } return in; } @@ -2598,96 +2598,96 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private void reclaimIncoming(IceInternal.Incoming in) { - if(_cacheBuffers) - { - synchronized(_incomingCacheMutex) - { - in.next = _incomingCache; - _incomingCache = in; - // - // Clear references to Ice objects as soon as possible. - // - _incomingCache.reclaim(); - } - } + if(_cacheBuffers) + { + synchronized(_incomingCacheMutex) + { + in.next = _incomingCache; + _incomingCache = in; + // + // Clear references to Ice objects as soon as possible. + // + _incomingCache.reclaim(); + } + } } public IceInternal.Outgoing getOutgoing(IceInternal.Reference reference, String operation, OperationMode mode, java.util.Map context, - boolean compress) - throws IceInternal.LocalExceptionWrapper + boolean compress) + throws IceInternal.LocalExceptionWrapper { - IceInternal.Outgoing out = null; - - if(_cacheBuffers) - { - synchronized(_outgoingCacheMutex) - { - if(_outgoingCache == null) - { - out = new IceInternal.Outgoing(this, reference, operation, mode, context, compress); - } - else - { - out = _outgoingCache; - _outgoingCache = _outgoingCache.next; - out.reset(reference, operation, mode, context, compress); - out.next = null; - } - } - } - else - { - out = new IceInternal.Outgoing(this, reference, operation, mode, context, compress); - } - - return out; + IceInternal.Outgoing out = null; + + if(_cacheBuffers) + { + synchronized(_outgoingCacheMutex) + { + if(_outgoingCache == null) + { + out = new IceInternal.Outgoing(this, reference, operation, mode, context, compress); + } + else + { + out = _outgoingCache; + _outgoingCache = _outgoingCache.next; + out.reset(reference, operation, mode, context, compress); + out.next = null; + } + } + } + else + { + out = new IceInternal.Outgoing(this, reference, operation, mode, context, compress); + } + + return out; } public void reclaimOutgoing(IceInternal.Outgoing out) { - if(_cacheBuffers) - { - // - // Clear references to Ice objects as soon as possible. - // - out.reclaim(); - - synchronized(_outgoingCacheMutex) - { - out.next = _outgoingCache; - _outgoingCache = out; - } - } + if(_cacheBuffers) + { + // + // Clear references to Ice objects as soon as possible. + // + out.reclaim(); + + synchronized(_outgoingCacheMutex) + { + out.next = _outgoingCache; + _outgoingCache = out; + } + } } private class ThreadPerConnection extends Thread { - public void - run() - { - if(ConnectionI.this._instance.initializationData().threadHook != null) - { - ConnectionI.this._instance.initializationData().threadHook.start(); - } - - try - { - ConnectionI.this.run(); - } - catch(Exception ex) - { - ConnectionI.this.error("exception in thread per connection", ex); - } - finally - { - if(ConnectionI.this._instance.initializationData().threadHook != null) - { - ConnectionI.this._instance.initializationData().threadHook.stop(); - } - } - } + public void + run() + { + if(ConnectionI.this._instance.initializationData().threadHook != null) + { + ConnectionI.this._instance.initializationData().threadHook.start(); + } + + try + { + ConnectionI.this.run(); + } + catch(Exception ex) + { + ConnectionI.this.error("exception in thread per connection", ex); + } + finally + { + if(ConnectionI.this._instance.initializationData().threadHook != null) + { + ConnectionI.this._instance.initializationData().threadHook.stop(); + } + } + } } private Thread _thread; private final boolean _threadPerConnection; |