diff options
Diffstat (limited to 'javae/src/Ice/Connection.java')
-rw-r--r-- | javae/src/Ice/Connection.java | 1867 |
1 files changed, 0 insertions, 1867 deletions
diff --git a/javae/src/Ice/Connection.java b/javae/src/Ice/Connection.java deleted file mode 100644 index 4e600fa3d5a..00000000000 --- a/javae/src/Ice/Connection.java +++ /dev/null @@ -1,1867 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. -// -// This copy of Ice-E is licensed to you under the terms described in the -// ICEE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package Ice; - -public final class Connection -{ - - synchronized public void - waitForValidation() - { - while(_state == StateNotValidated) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - if(_state >= StateClosing) - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_exception != null); - } - throw _exception; - } - } - - - public synchronized void - activate() - { - setState(StateActive); - } - - public synchronized void - hold() - { - setState(StateHolding); - } - - // DestructionReason. - public final static int ObjectAdapterDeactivated = 0; - public final static int CommunicatorDestroyed = 1; - - public synchronized void - destroy(int reason) - { - switch(reason) - { - case ObjectAdapterDeactivated: - { - setState(StateClosing, new ObjectAdapterDeactivatedException()); - break; - } - - case CommunicatorDestroyed: - { - setState(StateClosing, new CommunicatorDestroyedException()); - break; - } - } - } - - public synchronized void - close(boolean force) - { - if(force) - { - setState(StateClosed, new ForcedCloseConnectionException()); - } - else - { - // - // If we do a graceful shutdown, then we wait until all - // outstanding requests have been completed. Otherwise, - // the CloseConnectionException will cause all outstanding - // requests to be retried, regardless of whether the - // server has processed them or not. - // - while(!_requests.isEmpty()) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - setState(StateClosing, new CloseConnectionException()); - } - } - - public synchronized boolean - isDestroyed() - { - return _state >= StateClosing; - } - - public boolean - isFinished() - { - Thread threadPerConnection = null; - - synchronized(this) - { - if(_transceiver != null || _dispatchCount != 0 || - (_threadPerConnection != null && _threadPerConnection.isAlive())) - { - return false; - } - - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_state == StateClosed); - } - - threadPerConnection = _threadPerConnection; - _threadPerConnection = null; - } - - if(threadPerConnection != null) - { - while(true) - { - try - { - threadPerConnection.join(); - break; - } - catch(InterruptedException ex) - { - } - } - } - - return true; - } - - public synchronized void - throwException() - { - if(_exception != null) - { - IceUtil.Debug.Assert(_state >= StateClosing); - throw _exception; - } - } - - public synchronized void - waitUntilHolding() - { - while(_state < StateHolding || _dispatchCount > 0) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - } - - public void - waitUntilFinished() - { - Thread threadPerConnection = null; - - synchronized(this) - { - // - // We wait indefinitely until connection closing has been - // initiated. We also wait indefinitely until all outstanding - // requests are completed. Otherwise we couldn't guarantee - // that there are no outstanding calls when deactivate() is - // called on the servant locators. - // - while(_state < StateClosing || _dispatchCount > 0) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - // - // Now we must wait until close() has been called on the - // transceiver. - // - while(_transceiver != null) - { - try - { - if(_state != StateClosed && _endpoint.timeout() >= 0) - { - long absoluteWaitTime = _stateTime + _endpoint.timeout(); - long waitTime = absoluteWaitTime - System.currentTimeMillis(); - - if(waitTime > 0) - { - // - // We must wait a bit longer until we close this - // connection. - // - wait(waitTime); - if(System.currentTimeMillis() >= absoluteWaitTime) - { - setState(StateClosed, new CloseTimeoutException()); - } - } - else - { - // - // We already waited long enough, so let's close this - // connection! - // - setState(StateClosed, new CloseTimeoutException()); - } - - // - // No return here, we must still wait until - // close() is called on the _transceiver. - // - } - else - { - wait(); - } - } - catch(InterruptedException ex) - { - } - } - - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_state == StateClosed); - } - - threadPerConnection = _threadPerConnection; - _threadPerConnection = null; - } - - if(threadPerConnection != null) - { - while(true) - { - try - { - threadPerConnection.join(); - break; - } - catch(InterruptedException ex) - { - } - } - } - } - - public void - sendRequest(IceInternal.BasicStream os, IceInternal.Outgoing out) - throws IceInternal.LocalExceptionWrapper - { - boolean requestSent = false; - try - { - synchronized(_sendMonitor) - { - if(_transceiver == null) - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_exception != null); - - } - throw new IceInternal.LocalExceptionWrapper(_exception, true); - } - - int requestId = 0; - if(out != null) - { - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - os.pos(IceInternal.Protocol.headerSize); - os.writeInt(requestId); - - _requests.put(requestId, out); - } - - // - // Fill in the message size. - // - os.pos(10); - os.writeInt(os.size()); - - // - // Send the request. - // - IceInternal.TraceUtil.traceRequest("sending request", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - requestSent = true; - - if(out == null) - { - return; - } - - if(_blocking) - { - // - // Re-use the stream for reading the reply. - // - os.reset(); - - // - // Read the reply. - // - MessageInfo info = new MessageInfo(); - readStreamAndParseMessage(os, info); - if(info.invokeNum > 0) - { - Ice.Util.throwUnknownMessageException(); - } - else if(info.requestId != requestId) - { - Ice.Util.throwUnknownRequestIdException(); - } - - out.finished(os); - } - else - { - // - // Wait until the request has completed, or until the - // request times out. - // - int tout = timeout(); - long expireTime = 0; - if(tout > 0) - { - expireTime = System.currentTimeMillis() + tout; - } - - while(out.state() == IceInternal.Outgoing.StateInProgress) - { - try - { - if(tout > 0) - { - long now = System.currentTimeMillis(); - if(now < expireTime) - { - _sendMonitor.wait(expireTime - now); - } - - // - // Make sure we woke up because of timeout and not another response. - // - if(out.state() == IceInternal.Outgoing.StateInProgress && - System.currentTimeMillis() > expireTime) - { - throw new TimeoutException(); - } - } - else - { - _sendMonitor.wait(); - } - } - catch(InterruptedException ex) - { - } - } - } - } - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_exception != null); - } - if(!requestSent) - { - throw _exception; - } - } - - // - // If the request was already sent, we don't throw - // directly but instead we set the Outgoing object - // exception with finished(). Throwing directly would - // break "at-most-once" (see also comment in - // Outgoing.invoke()) - // - synchronized(_sendMonitor) - { - if(_blocking) - { - out.finished(ex); - } - else - { - // Wait for the connection thread to propagate the exception - // to the Outgoing object. - while(out.state() == IceInternal.Outgoing.StateInProgress) - { - try - { - _sendMonitor.wait(); - } - catch(java.lang.InterruptedException e) - { - } - } - } - } - } - } - - public synchronized void - prepareBatchRequest(IceInternal.BasicStream os) - { - while(_batchStreamInUse && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - if(_exception != null) - { - throw _exception; - } - - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_state > StateNotValidated); - IceUtil.Debug.Assert(_state < StateClosing); - } - - if(_batchStream.isEmpty()) - { - try - { - _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr); - } - catch(LocalException ex) - { - setState(StateClosed, ex); - throw ex; - } - } - - _batchStreamInUse = true; - _batchMarker = _batchStream.size(); - _batchStream.swap(os); - - // - // The batch stream now belongs to the caller, until - // finishBatchRequest() or abortBatchRequest() is called. - // - } - - public void - finishBatchRequest(IceInternal.BasicStream os) - { - boolean autoflush = false; - byte[] lastRequest = null; - - synchronized(this) - { - // - // Get the batch stream back. - // - _batchStream.swap(os); - - if(_batchAutoFlush) - { - synchronized(_sendMonitor) - { - if(_transceiver == null) - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_exception != null); - } - throw _exception; // The exception is immutable at this point. - } - // - // Throw memory limit exception if the first message added causes us - // to go over limit. Otherwise put aside the marshalled message that - // caused limit to be exceeded and rollback stream to the marker. - // - if(_batchStream.size() > _instance.messageSizeMax()) - { - if(_batchRequestNum == 0) - { - resetBatch(true); - throw new Ice.MemoryLimitException(); - } - - lastRequest = new byte[_batchStream.size() - _batchMarker]; - IceInternal.ByteBuffer buffer = _batchStream.prepareRead(); - buffer.position(_batchMarker); - buffer.get(lastRequest); - _batchStream.resize(_batchMarker, false); - autoflush = true; - } - } - } - - if(!autoflush) - { - // - // Increment the number of requests in the batch. - // - ++_batchRequestNum; - - // - // Notify about the batch stream not being in use anymore. - // - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_batchStreamInUse); - } - _batchStreamInUse = false; - notifyAll(); - } - } - - if(autoflush) - { - // - // We have to keep _batchStreamInUse set until after we insert the - // saved marshalled data into a new stream. - // - flushBatchRequestsInternal(true); - - synchronized(this) - { - // - // Throw memory limit exception if the message that caused us to go over - // limit causes us to exceed the limit by itself. - // - if(IceInternal.Protocol.requestBatchHdr.length + lastRequest.length > _instance.messageSizeMax()) - { - resetBatch(true); - throw new MemoryLimitException(); - } - - // - // Start a new batch with the last message that caused us to - // go over the limit. - // - try - { - _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr); - _batchStream.writeBlob(lastRequest); - } - catch(LocalException ex) - { - setState(StateClosed, ex); - throw ex; - } - - // - // Notify that the batch stream not in use anymore. - // - ++_batchRequestNum; - _batchStreamInUse = false; - notifyAll(); - } - } - } - - public synchronized void - abortBatchRequest() - { - // - // Reset the batch stream. We cannot save old requests - // in the batch stream, as they might be corrupted due to - // incomplete marshaling. - // - resetBatch(true); - } - - public void - flushBatchRequests() - { - flushBatchRequestsInternal(false); - } - - private void - flushBatchRequestsInternal(boolean ignoreInUse) - { - synchronized(this) - { - if(!ignoreInUse) - { - while(_batchStreamInUse && _exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - } - - if(_exception != null) - { - throw _exception; - } - - if(_batchStream.isEmpty()) - { - return; // Nothing to do. - } - - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_state > StateNotValidated); - IceUtil.Debug.Assert(_state < StateClosing); - } - - // - // Fill in the message size. - // - _batchStream.pos(10); - _batchStream.writeInt(_batchStream.size()); - - // - // Fill in the number of requests in the batch. - // - _batchStream.writeInt(_batchRequestNum); - - // - // Compression not supported. - // - _batchStream.pos(9); - _batchStream.writeByte((byte)(0)); - - // - // Prevent that new batch requests are added while we are - // flushing. - // - _batchStreamInUse = true; - } - - try - { - synchronized(_sendMonitor) - { - if(_transceiver == null) // Has the transceiver already been closed? - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_exception != null); - } - throw _exception; // The exception is immutable at this point. - } - - // - // Send the batch request. - // - IceInternal.TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - _transceiver.write(_batchStream, _endpoint.timeout()); - } - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_exception != null); - } - - // - // Since batch requests are all oneways, we - // must report the exception to the caller. - // - throw _exception; - } - } - - synchronized(this) - { - // - // Reset the batch stream, and notify that flushing is over. - // - resetBatch(!ignoreInUse); - } - } - - private void - resetBatch(boolean resetInUse) - { - _batchStream = new IceInternal.BasicStream(_instance, _batchAutoFlush); - _batchRequestNum = 0; - - // - // Notify about the batch stream not being in use anymore. - // - if(resetInUse) - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_batchStreamInUse); - } - _batchStreamInUse = false; - notifyAll(); - } - } - - public void - sendResponse(IceInternal.BasicStream os) - { - try - { - synchronized(_sendMonitor) - { - if(_transceiver == null) // Has the transceiver already been closed? - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_exception != null); - } - throw _exception; // The exception is immutable at this point. - } - - // - // Compression not supported. - // - os.pos(9); - os.writeByte((byte)(0)); - - // - // Fill in the message size. - // - os.pos(10); - os.writeInt(os.size()); - - // - // Send the reply. - // - IceInternal.TraceUtil.traceReply("sending reply", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - } - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - } - } - - synchronized(this) - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_state > StateNotValidated); - } - - try - { - if(--_dispatchCount == 0) - { - notifyAll(); - } - - if(_state == StateClosing && _dispatchCount == 0) - { - initiateShutdown(); - } - } - catch(LocalException ex) - { - setState(StateClosed, ex); - } - } - } - - public synchronized void - sendNoResponse() - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_state > StateNotValidated); - } - - try - { - if(--_dispatchCount == 0) - { - notifyAll(); - } - - if(_state == StateClosing && _dispatchCount == 0) - { - initiateShutdown(); - } - } - catch(LocalException ex) - { - setState(StateClosed, ex); - } - } - - public IceInternal.Endpoint - endpoint() - { - // No mutex protection necessary, _endpoint is immutable. - return _endpoint; - } - - public synchronized void - setAdapter(ObjectAdapter adapter) - { - if(_blocking) - { - FeatureNotSupportedException ex = new FeatureNotSupportedException(); - ex.unsupportedFeature = "setAdapter with blocking connection"; - throw ex; - } - - // - // Wait for all the incoming to be dispatched (to be consistent - // with IceE). - // - while(_dispatchCount > 0) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - if(_exception != null) - { - throw _exception; - } - - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_state < StateClosing); - } - - _in.setAdapter(adapter); - } - - public synchronized ObjectAdapter - getAdapter() - { - return _in.getAdapter(); - } - - public synchronized ObjectPrx - createProxy(Identity ident) - { - // - // Create a reference and return a reverse proxy for this - // reference. - // - Connection[] connections = new Connection[1]; - connections[0] = this; - IceInternal.Reference ref = - _instance.referenceFactory().create(ident, null, "", IceInternal.Reference.ModeTwoway, connections); - return _instance.proxyFactory().referenceToProxy(ref); - } - - public String - type() - { - return _type; // No mutex lock, _type is immutable. - } - - public int - timeout() - { - return _endpoint.timeout(); // No mutex protection necessary, _endpoint is immutable. - } - - public String - toString() - { - return _desc; // No mutex lock, _desc is immutable. - } - - public Connection(IceInternal.Instance instance, IceInternal.Transceiver transceiver, - IceInternal.Endpoint endpoint, ObjectAdapter adapter) - { - _instance = instance; - _transceiver = transceiver; - _desc = transceiver.toString(); - _type = transceiver.type(); - _endpoint = endpoint; - _logger = instance.initializationData().logger; // Cached for better performance. - _traceLevels = instance.traceLevels(); // Cached for better performance. - _warn = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; - _nextRequestId = 1; - _batchAutoFlush = _instance.initializationData().properties.getPropertyAsIntWithDefault( - "Ice.BatchAutoFlush", 1) > 0 ? true : false; - _batchStream = new IceInternal.BasicStream(instance, _batchAutoFlush); - _batchStreamInUse = false; - _batchRequestNum = 0; - _dispatchCount = 0; - _state = StateNotValidated; - _stateTime = System.currentTimeMillis(); - _blocking = _instance.initializationData().properties.getPropertyAsInt("Ice.Blocking") > 0 && adapter == null; - _stream = new IceInternal.BasicStream(_instance); - _in = new IceInternal.Incoming(_instance, this, _stream, adapter); - - if(_blocking) - { - validate(); - } - else - { - try - { - // - // If we are in thread per connection mode, create the thread - // for this connection. - // - _threadPerConnection = new ThreadPerConnection(this); - _threadPerConnection.start(); - } - catch(java.lang.Exception ex) - { - ex.printStackTrace(); - String s = "cannot create thread for connection:\n";; - s += ex.toString(); - _logger.error(s); - - try - { - _transceiver.close(); - } - catch(LocalException e) - { - // Here we ignore any exceptions in close(). - } - - Ice.SyscallException e = new Ice.SyscallException(); - e.initCause(ex); - throw e; - } - } - } - - protected synchronized void - finalize() - throws Throwable - { - IceUtil.Debug.FinalizerAssert(_state == StateClosed); - IceUtil.Debug.FinalizerAssert(_transceiver == null); - IceUtil.Debug.FinalizerAssert(_dispatchCount == 0); - IceUtil.Debug.FinalizerAssert(_threadPerConnection == null); - } - - private static final int StateNotValidated = 0; - private static final int StateActive = 1; - private static final int StateHolding = 2; - private static final int StateClosing = 3; - private static final int StateClosed = 4; - - private void - validate() - { - boolean active; - - synchronized(this) - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_state == StateNotValidated || _state == StateClosed); - } - if(_state == StateClosed) - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_exception != null); - } - throw _exception; - } - - if(_in.getAdapter() != null) - { - active = true; // The server side has the active role for connection validation. - } - else - { - active = false; // The client side has the passive role for connection validation. - } - } - - try - { - int timeout; - IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - if(defaultsAndOverrides.overrideConnectTimeout) - { - timeout = defaultsAndOverrides.overrideConnectTimeoutValue; - } - else - { - timeout = _endpoint.timeout(); - } - - if(active) - { - synchronized(_sendMonitor) - { - IceInternal.BasicStream os = new IceInternal.BasicStream(_instance); - os.writeBlob(IceInternal.Protocol.magic); - os.writeByte(IceInternal.Protocol.protocolMajor); - os.writeByte(IceInternal.Protocol.protocolMinor); - os.writeByte(IceInternal.Protocol.encodingMajor); - os.writeByte(IceInternal.Protocol.encodingMinor); - os.writeByte(IceInternal.Protocol.validateConnectionMsg); - os.writeByte((byte)0); // Compression status (always zero for validate connection). - os.writeInt(IceInternal.Protocol.headerSize); // Message size. - IceInternal.TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels); - try - { - _transceiver.write(os, timeout); - } - catch(Ice.TimeoutException ex) - { - throw new Ice.ConnectTimeoutException(); - } - } - } - else - { - IceInternal.BasicStream is = new IceInternal.BasicStream(_instance); - is.resize(IceInternal.Protocol.headerSize, true); - is.pos(0); - try - { - _transceiver.read(is, timeout); - } - catch(Ice.TimeoutException ex) - { - throw new Ice.ConnectTimeoutException(); - } - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(is.pos() == IceInternal.Protocol.headerSize); - } - is.pos(0); - byte[] m = is.readBlob(4); - if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || - m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) - { - Ice.Util.throwBadMagicException(m); - } - byte pMajor = is.readByte(); - byte pMinor = is.readByte(); - if(pMajor != IceInternal.Protocol.protocolMajor) - { - Ice.Util.throwUnsupportedProtocolException(pMajor, pMinor); - } - byte eMajor = is.readByte(); - byte eMinor = is.readByte(); - if(eMajor != IceInternal.Protocol.encodingMajor) - { - Ice.Util.throwUnsupportedEncodingException(eMajor, eMinor); - } - byte messageType = is.readByte(); - if(messageType != IceInternal.Protocol.validateConnectionMsg) - { - Ice.Util.throwConnectionNotValidatedException(); - } - byte compress = is.readByte(); // Ignore compression status for validate connection. - int size = is.readInt(); - if(size != IceInternal.Protocol.headerSize) - { - Ice.Util.throwIllegalMessageSizeException(); - } - IceInternal.TraceUtil.traceHeader("received validate connection", is, _logger, _traceLevels); - } - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_exception != null); - } - throw _exception; - } - } - - synchronized(this) - { - // - // We start out in holding state. - // - setState(StateHolding); - } - } - - private void - setState(int state, LocalException ex) - { - // - // If setState() is called with an exception, then only closed - // and closing states are permissible. - // - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(state == StateClosing || state == StateClosed); - } - - if(_state == state) // Don't switch twice. - { - return; - } - - if(_exception == null) - { - _exception = ex; - - if(_warn) - { - // - // We don't warn if we are not validated. - // - if(_state > StateNotValidated) - { - // - // Don't warn about certain expected exceptions. - // - if(!(_exception instanceof CloseConnectionException || - _exception instanceof ForcedCloseConnectionException || - _exception instanceof CommunicatorDestroyedException || - _exception instanceof ObjectAdapterDeactivatedException || - (_exception instanceof ConnectionLostException && _state == StateClosing))) - { - warning("connection exception", _exception); - } - } - } - } - - // - // We must set the new state before we notify requests of any - // exceptions. Otherwise new requests may retry on a - // connection that is not yet marked as closed or closing. - // - setState(state); - } - - private void - setState(int state) - { - // - // Skip graceful shutdown if we are destroyed before validation. - // - if(_state == StateNotValidated && state == StateClosing) - { - state = StateClosed; - } - - if(_state == state) // Don't switch twice. - { - return; - } - - switch(state) - { - case StateNotValidated: - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(false); - } - break; - } - - case StateActive: - { - // - // Can only switch from holding or not validated to - // active. - // - if(_state != StateHolding && _state != StateNotValidated) - { - return; - } - break; - } - - case StateHolding: - { - // - // Can only switch from active or not validated to - // holding. - // - if(_state != StateActive && _state != StateNotValidated) - { - return; - } - break; - } - - case StateClosing: - { - // - // Can't change back from closed. - // - if(_state == StateClosed) - { - return; - } - break; - } - - case StateClosed: - { - // - // We shutdown both for reading and writing. This will - // unblock and read call with an exception. The thread - // per connection then closes the transceiver. - // - _transceiver.shutdownReadWrite(); - - // - // In blocking mode, we close the transceiver now. - // - if(_blocking) - { - synchronized(_sendMonitor) - { - try - { - _transceiver.close(); - } - catch(Ice.LocalException ex) - { - } - _transceiver = null; - } - } - break; - } - } - - _state = state; - _stateTime = System.currentTimeMillis(); - - notifyAll(); - - if(_state == StateClosing && _dispatchCount == 0) - { - try - { - initiateShutdown(); - if(_blocking) - { - setState(StateClosed); - } - } - catch(LocalException ex) - { - setState(StateClosed, ex); - } - } - } - - private void - initiateShutdown() - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_state == StateClosing); - IceUtil.Debug.Assert(_dispatchCount == 0); - } - - synchronized(_sendMonitor) - { - // - // Before we shut down, we send a close connection - // message. - // - IceInternal.BasicStream os = new IceInternal.BasicStream(_instance); - os.writeBlob(IceInternal.Protocol.magic); - os.writeByte(IceInternal.Protocol.protocolMajor); - os.writeByte(IceInternal.Protocol.protocolMinor); - os.writeByte(IceInternal.Protocol.encodingMajor); - os.writeByte(IceInternal.Protocol.encodingMinor); - os.writeByte(IceInternal.Protocol.closeConnectionMsg); - os.writeByte((byte)0); // Compression not supported. - os.writeInt(IceInternal.Protocol.headerSize); // Message size. - - // - // Send the message. - // - IceInternal.TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - // - // The CloseConnection message should be sufficient. Closing the write - // end of the socket is probably an artifact of how things were done - // in IIOP. In fact, shutting down the write end of the socket causes - // problems on Windows by preventing the peer from using the socket. - // For example, the peer is no longer able to continue writing a large - // message after the socket is shutdown. - // - //_transceiver.shutdownWrite(); - } - } - - private static class MessageInfo - { - int invokeNum; - int requestId; - } - - private void - readStreamAndParseMessage(IceInternal.BasicStream stream, MessageInfo info) - { - // - // Read the header. - // - stream.resize(IceInternal.Protocol.headerSize, true); - stream.pos(0); - _transceiver.read(stream, _blocking ? _endpoint.timeout() : -1); - - int pos = stream.pos(); - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(pos >= IceInternal.Protocol.headerSize); - } - stream.pos(0); - byte[] m = stream.readBlob(4); - if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || - m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) - { - Ice.Util.throwBadMagicException(m); - } - byte pMajor = stream.readByte(); - byte pMinor = stream.readByte(); - if(pMajor != IceInternal.Protocol.protocolMajor) - { - Ice.Util.throwUnsupportedProtocolException(pMajor, pMinor); - } - byte eMajor = stream.readByte(); - byte eMinor = stream.readByte(); - if(eMajor != IceInternal.Protocol.encodingMajor) - { - Ice.Util.throwUnsupportedEncodingException(eMajor, eMinor); - } - byte messageType = stream.readByte(); - byte compress = stream.readByte(); - if(compress == (byte)2) - { - FeatureNotSupportedException ex = new FeatureNotSupportedException(); - ex.unsupportedFeature = "compression"; - throw ex; - } - - int size = stream.readInt(); - if(size < IceInternal.Protocol.headerSize) - { - Ice.Util.throwIllegalMessageSizeException(); - } - if(size > _instance.messageSizeMax()) - { - throw new MemoryLimitException(); - } - if(size > stream.size()) - { - stream.resize(size, true); - } - stream.pos(pos); - - // - // Read the rest of the message. - // - if(pos != stream.size()) - { - _transceiver.read(stream, _blocking ? _endpoint.timeout() : -1); - } - - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(stream.pos() == stream.size()); - } - stream.pos(IceInternal.Protocol.headerSize); - - switch(messageType) - { - case IceInternal.Protocol.closeConnectionMsg: - { - IceInternal.TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels); - throw new CloseConnectionException(); - } - - case IceInternal.Protocol.replyMsg: - { - IceInternal.TraceUtil.traceReply("received reply", stream, _logger, _traceLevels); - info.requestId = stream.readInt(); - break; - } - - case IceInternal.Protocol.requestMsg: - { - IceInternal.TraceUtil.traceRequest("received request", stream, _logger, _traceLevels); - info.requestId = stream.readInt(); - info.invokeNum = 1; - break; - } - - case IceInternal.Protocol.requestBatchMsg: - { - IceInternal.TraceUtil.traceBatchRequest("received batch request", stream, _logger, _traceLevels); - info.invokeNum = stream.readInt(); - if(info.invokeNum < 0) - { - info.invokeNum = 0; - Ice.Util.throwNegativeSizeException(); - } - break; - } - - case IceInternal.Protocol.validateConnectionMsg: - { - IceInternal.TraceUtil.traceHeader("received validate connection", stream, _logger, _traceLevels); - if(_warn) - { - _logger.warning("ignoring unexpected validate connection message:\n" + _desc); - } - break; - } - - default: - { - IceInternal.TraceUtil.traceHeader("received unexpected message\n" + - "(invalid, closing connection)", stream, _logger, - _traceLevels); - Ice.Util.throwUnknownMessageException(); - } - } - } - - public void - run() - { - // - // The thread-per-connection must validate and activate this connection, - // and not in the connection factory. Please see the comments in the - // connection factory for details. - // - try - { - validate(); - } - catch(LocalException ex) - { - synchronized(this) - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_state == StateClosed); - } - - // - // We must make sure that nobody is sending when - // we close the transceiver. - // - synchronized(_sendMonitor) - { - try - { - _transceiver.close(); - } - catch(LocalException e) - { - // Here we ignore any exceptions in close(). - } - - _transceiver = null; - notifyAll(); - } - } - return; - } - - activate(); - - boolean closed = false; - - MessageInfo info = new MessageInfo(); - - while(!closed) - { - info.requestId = 0; - info.invokeNum = 0; - _in.os().reset(); - _in.is().reset(); - - // - // Read and parse the next message. We don't need to lock the - // send monitor here as we have the guarantee that - // _transceiver won't be set to 0 by another thread, the - // thread per connection is the only thread that can set - // _transceiver to 0. - // - try - { - readStreamAndParseMessage(_stream, info); - } - catch(Ice.LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - } - } - - synchronized(this) - { - if(_state != StateClosed) - { - if(info.invokeNum > 0) // We received a request or a batch request - { - if(_state == StateClosing) - { - IceInternal.TraceUtil.traceRequest( - "received " + (info.invokeNum > 1 ? "batch request" : "request") + " during closing\n"+ - "(ignored by server, client will retry)", _stream, _logger, _traceLevels); - info.invokeNum = 0; - } - _dispatchCount += info.invokeNum; - } - else if(info.requestId > 0) - { - try - { - synchronized(_sendMonitor) - { - IceInternal.Outgoing out = (IceInternal.Outgoing)_requests.remove(info.requestId); - if(out != null) - { - out.finished(_stream); - _sendMonitor.notifyAll(); // Wake up threads waiting in sendRequest() - } - else - { - Ice.Util.throwUnknownRequestIdException(); - } - } - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - } - } - } - - while(_state == StateHolding) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - if(_state == StateClosed) - { - // - // We must make sure that nobody is sending when we close - // the transceiver. - // - synchronized(_sendMonitor) - { - try - { - _transceiver.close(); - } - catch(LocalException ex) - { - } - - _transceiver = null; - notifyAll(); - } - - // - // We cannot simply return here. We have to make sure - // that all requests are notified about the closed - // connection below. - // - closed = true; - } - - if(_state == StateClosed || _state == StateClosing) - { - synchronized(_sendMonitor) - { - java.util.Enumeration i = _requests.elements(); - while(i.hasMoreElements()) - { - IceInternal.IntMap.Entry e = (IceInternal.IntMap.Entry)i.nextElement(); - IceInternal.Outgoing out = (IceInternal.Outgoing)e.getValue(); - out.finished(_exception); // The exception is immutable at this point. - } - _requests.clear(); - _sendMonitor.notifyAll(); // Wake up threads waiting in sendRequest() - } - } - } - - // - // Method invocation (or multiple invocations for batch messages) - // must be done outside the thread synchronization, so that nested - // calls are possible. - // - try - { - for(; info.invokeNum > 0; --info.invokeNum) - { - // - // Prepare the response if necessary. - // - final boolean response = info.requestId != 0; - if(response) - { - if(IceUtil.Debug.ASSERT) - { - // No further invocations if a response is expected. - IceUtil.Debug.Assert(info.invokeNum == 1); - } - - // - // Add the reply header and request id. - // - IceInternal.BasicStream os = _in.os(); - os.writeBlob(IceInternal.Protocol.replyHdr); - os.writeInt(info.requestId); - } - - _in.invoke(response, info.requestId); - } - } - catch(LocalException ex) - { - synchronized(this) - { - setState(StateClosed, ex); - } - } - catch(IceUtil.AssertionError ex) // Upon assertion, we print the stack trace. - { - synchronized(this) - { - UnknownException uex = new UnknownException(); - uex.unknown = ex.toString(); - _logger.error(uex.unknown); - setState(StateClosed, uex); - } - } - catch(java.lang.Exception ex) - { - synchronized(this) - { - UnknownException uex = new UnknownException(); - uex.unknown = ex.toString(); - setState(StateClosed, uex); - } - } - - // - // If invoke() above raised an exception, and therefore - // neither sendResponse() nor sendNoResponse() has been - // called, then we must decrement _dispatchCount here. - // - if(info.invokeNum > 0) - { - synchronized(this) - { - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_dispatchCount > 0); - } - _dispatchCount -= info.invokeNum; - if(IceUtil.Debug.ASSERT) - { - IceUtil.Debug.Assert(_dispatchCount >= 0); - } - if(_dispatchCount == 0) - { - notifyAll(); - } - } - } - } - } - - public void - warning(String msg, Exception ex) - { - String s = msg + ":\n" + ex.toString() + "\n" + _desc; - _logger.warning(s); - } - - public void - error(String msg, Exception ex) - { - String s = msg + ":\n" + ex.toString() + "\n" + _desc; - _logger.error(s); - } - - public IceInternal.Outgoing - getOutgoing(IceInternal.Reference reference, String operation, OperationMode mode, java.util.Hashtable context) - { - IceInternal.Outgoing out; - - synchronized(_outgoingCacheMutex) - { - if(_outgoingCache == null) - { - out = new IceInternal.Outgoing(this, reference, operation, mode, context); - } - else - { - out = _outgoingCache; - _outgoingCache = _outgoingCache.next; - out.reset(reference, operation, mode, context); - out.next = null; - } - } - - return out; - } - - public void - reclaimOutgoing(IceInternal.Outgoing out) - { - synchronized(_outgoingCacheMutex) - { - out.next = _outgoingCache; - _outgoingCache = out; - } - } - - private class ThreadPerConnection extends Thread - { - ThreadPerConnection(Connection connection) - { - _connection = connection; - } - - public void - run() - { - try - { - _connection.run(); - } - catch(Exception ex) - { - _connection.error("exception in thread per connection", ex); - } - } - - Connection _connection; - } - private Thread _threadPerConnection; - - private IceInternal.Instance _instance; - private IceInternal.Transceiver _transceiver; - private /*final*/ String _desc; - private /*final*/ String _type; - private /*final*/ IceInternal.Endpoint _endpoint; - - private /*final*/ IceInternal.BasicStream _stream; - private /*final*/ IceInternal.Incoming _in; - - private /*final*/ Logger _logger; - private /*final*/ IceInternal.TraceLevels _traceLevels; - - private /*final*/ boolean _warn; - - private LocalException _exception; - - private boolean _batchAutoFlush; - private IceInternal.BasicStream _batchStream; - private boolean _batchStreamInUse; - private int _batchRequestNum; - private int _batchMarker; - - private int _dispatchCount; - - private int _state; // The current state. - private long _stateTime; // The last time when the state was changed. - - private boolean _blocking; - - // - // We have a separate mutex for sending, so that we don't block - // the whole connection when we do a blocking send. The monitor - // is also used by outgoing calls to wait for replies when thread - // per connection is used. The _nextRequestId, _requests and - // _requestsHint attributes are also protected by this monitor. - // Calls on the (non thread-safe) Outgoing objects should also - // only be made with this monitor locked. - // - // Finally, it's safe to lock the _sendMonitor with the connection - // already locked. The contrary isn't permitted. - // - private java.lang.Object _sendMonitor = new java.lang.Object(); - private int _nextRequestId; - private IceInternal.IntMap _requests = new IceInternal.IntMap(); - - private IceInternal.Outgoing _outgoingCache; - private java.lang.Object _outgoingCacheMutex = new java.lang.Object(); -} |