summaryrefslogtreecommitdiff
path: root/java/src/Ice/ConnectionI.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r--java/src/Ice/ConnectionI.java3178
1 files changed, 0 insertions, 3178 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
deleted file mode 100644
index c0b74d70bac..00000000000
--- a/java/src/Ice/ConnectionI.java
+++ /dev/null
@@ -1,3178 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package Ice;
-
-public final class ConnectionI extends IceInternal.EventHandler
- implements Connection, IceInternal.ResponseHandler, IceInternal.CancellationHandler
-{
- public interface StartCallback
- {
- void connectionStartCompleted(ConnectionI connection);
-
- void connectionStartFailed(ConnectionI connection, Ice.LocalException ex);
- }
-
- private class TimeoutCallback implements Runnable
- {
- @Override
- public void run()
- {
- timedOut();
- }
- }
-
- public void start(StartCallback callback)
- {
- try
- {
- synchronized(this)
- {
- // The connection might already be closed if the communicator
- // was destroyed.
- if(_state >= StateClosed)
- {
- assert (_exception != null);
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
-
- if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
- {
- _startCallback = callback;
- return;
- }
-
- //
- // We start out in holding state.
- //
- setState(StateHolding);
- }
- }
- catch(Ice.LocalException ex)
- {
- exception(ex);
- callback.connectionStartFailed(this, _exception);
- return;
- }
-
- callback.connectionStartCompleted(this);
- }
-
- public void startAndWait() throws InterruptedException
- {
- try
- {
- synchronized(this)
- {
- // The connection might already be closed if the communicator
- // was destroyed.
- if(_state >= StateClosed)
- {
- assert (_exception != null);
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
-
- if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
- {
- while(_state <= StateNotValidated)
- {
- wait();
- }
-
- if(_state >= StateClosing)
- {
- assert (_exception != null);
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
- }
-
- //
- // We start out in holding state.
- //
- setState(StateHolding);
- }
- }
- catch(Ice.LocalException ex)
- {
- exception(ex);
- waitUntilFinished();
- return;
- }
- }
-
- public synchronized void activate()
- {
- if(_state <= StateNotValidated)
- {
- return;
- }
-
- if(_acmLastActivity > 0)
- {
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
- }
-
- setState(StateActive);
- }
-
- public synchronized void hold()
- {
- if(_state <= StateNotValidated)
- {
- return;
- }
-
- setState(StateHolding);
- }
-
- // DestructionReason.
- public final static int ObjectAdapterDeactivated = 0;
- public final static int CommunicatorDestroyed = 1;
-
- synchronized public void destroy(int reason)
- {
- switch(reason)
- {
- case ObjectAdapterDeactivated:
- {
- setState(StateClosing, new ObjectAdapterDeactivatedException());
- break;
- }
-
- case CommunicatorDestroyed:
- {
- setState(StateClosing, new CommunicatorDestroyedException());
- break;
- }
- }
- }
-
- @Override
- synchronized public void close(boolean force)
- {
- if(Thread.interrupted())
- {
- throw new Ice.OperationInterruptedException();
- }
-
- if(force)
- {
- setState(StateClosed, new ForcedCloseConnectionException());
- }
- else
- {
- //
- // If we do a graceful shutdown, then we wait until all
- // outstanding requests have been completed. Otherwise,
- // the CloseConnectionException will cause all outstanding
- // requests to be retried, regardless of whether the
- // server has processed them or not.
- //
- while(!_asyncRequests.isEmpty())
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- throw new Ice.OperationInterruptedException();
- }
- }
-
- setState(StateClosing, new CloseConnectionException());
- }
- }
-
- public synchronized boolean isActiveOrHolding()
- {
- return _state > StateNotValidated && _state < StateClosing;
- }
-
- public synchronized boolean isFinished()
- {
- if(_state != StateFinished || _dispatchCount != 0)
- {
- return false;
- }
-
- assert (_state == StateFinished);
- return true;
- }
-
- public synchronized void throwException()
- {
- if(_exception != null)
- {
- assert (_state >= StateClosing);
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
- }
-
- public synchronized void waitUntilHolding() throws InterruptedException
- {
- while(_state < StateHolding || _dispatchCount > 0)
- {
- wait();
- }
- }
-
- public synchronized void waitUntilFinished() throws InterruptedException
- {
- //
- // We wait indefinitely until the connection is finished and all
- // outstanding requests are completed. Otherwise we couldn't
- // guarantee that there are no outstanding calls when deactivate()
- // is called on the servant locators.
- //
- while(_state < StateFinished || _dispatchCount > 0)
- {
- wait();
- }
-
- assert (_state == StateFinished);
-
- //
- // Clear the OA. See bug 1673 for the details of why this is necessary.
- //
- _adapter = null;
- }
-
- synchronized public void updateObserver()
- {
- if(_state < StateNotValidated || _state > StateClosed)
- {
- return;
- }
-
- assert (_instance.initializationData().observer != null);
- _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
- _endpoint,
- toConnectionState(_state),
- _observer);
- if(_observer != null)
- {
- _observer.attach();
- }
- else
- {
- _writeStreamPos = -1;
- _readStreamPos = -1;
- }
- }
-
- synchronized public void monitor(long now, IceInternal.ACMConfig acm)
- {
- if(_state != StateActive)
- {
- return;
- }
-
- if(_readStream.size() > IceInternal.Protocol.headerSize || !_writeStream.isEmpty())
- {
- //
- // If writing or reading, nothing to do, the connection
- // timeout will kick-in if writes or reads don't progress.
- // This check is necessary because the activity timer is
- // only set when a message is fully read/written.
- //
- return;
- }
-
- //
- // We send a heartbeat if there was no activity in the last
- // (timeout / 4) period. Sending a heartbeat sooner than
- // really needed is safer to ensure that the receiver will
- // receive in time the heartbeat. Sending the heartbeat if
- // there was no activity in the last (timeout / 2) period
- // isn't enough since monitor() is called only every (timeout
- // / 2) period.
- //
- // Note that this doesn't imply that we are sending 4
- // heartbeats per timeout period because the monitor() method
- // is sill only called every (timeout / 2) period.
- //
-
- if(acm.heartbeat == ACMHeartbeat.HeartbeatAlways ||
- (acm.heartbeat != ACMHeartbeat.HeartbeatOff && now >= (_acmLastActivity + acm.timeout / 4)))
- {
- if(acm.heartbeat != ACMHeartbeat.HeartbeatOnInvocation || _dispatchCount > 0)
- {
- heartbeat();
- }
- }
-
- if(acm.close != ACMClose.CloseOff && now >= (_acmLastActivity + acm.timeout))
- {
- if(acm.close == ACMClose.CloseOnIdleForceful ||
- (acm.close != ACMClose.CloseOnIdle && (!_asyncRequests.isEmpty())))
- {
- //
- // Close the connection if we didn't receive a heartbeat in
- // the last period.
- //
- setState(StateClosed, new ConnectionTimeoutException());
- }
- else if(acm.close != ACMClose.CloseOnInvocation && _dispatchCount == 0 && _batchStream.isEmpty() &&
- _asyncRequests.isEmpty())
- {
- //
- // The connection is idle, close it.
- //
- setState(StateClosing, new ConnectionTimeoutException());
- }
- }
- }
-
- synchronized public int sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response)
- throws IceInternal.RetryException
- {
- final IceInternal.BasicStream os = out.getOs();
-
- if(_exception != null)
- {
- //
- // If the connection is closed before we even have a chance
- // to send our request, we always try to send the request
- // again.
- //
- throw new IceInternal.RetryException((Ice.LocalException) _exception.fillInStackTrace());
- }
-
- assert (_state > StateNotValidated);
- assert (_state < StateClosing);
-
- //
- // Ensure the message isn't bigger than what we can send with the
- // transport.
- //
- _transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax());
-
- //
- // Notify the request that it's cancelable with this connection.
- // This will throw if the request is canceled.
- //
- out.cancelable(this);
-
- int requestId = 0;
- if(response)
- {
- //
- // Create a new unique request ID.
- //
- requestId = _nextRequestId++;
- if(requestId <= 0)
- {
- _nextRequestId = 1;
- requestId = _nextRequestId++;
- }
-
- //
- // Fill in the request ID.
- //
- os.pos(IceInternal.Protocol.headerSize);
- os.writeInt(requestId);
- }
-
- out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
-
- int status;
- try
- {
- status = sendMessage(new OutgoingMessage(out, os, compress, requestId));
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert (_exception != null);
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
-
- if(response)
- {
- //
- // Add to the async requests map.
- //
- _asyncRequests.put(requestId, out);
- }
- return status;
- }
-
- public synchronized void prepareBatchRequest(IceInternal.BasicStream os) throws IceInternal.RetryException
- {
- waitBatchStreamInUse();
-
- if(_exception != null)
- {
- //
- // If there were no batch requests queued when the connection
- // failed, we can safely retry with a new connection. Otherwise, we
- // must throw to notify the caller that some previous batch requests
- // were not sent.
- //
- if(_batchStream.isEmpty())
- {
- throw new IceInternal.RetryException((Ice.LocalException) _exception.fillInStackTrace());
- }
- else
- {
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
- }
-
- assert (_state > StateNotValidated);
- assert (_state < StateClosing);
-
- if(_batchStream.isEmpty())
- {
- try
- {
- _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr);
- }
- catch(LocalException ex)
- {
- setState(StateClosed, ex);
- throw ex;
- }
- }
-
- _batchStreamInUse = true;
- _batchMarker = _batchStream.size();
- _batchStream.swap(os);
-
- //
- // The batch stream now belongs to the caller, until
- // finishBatchRequest() or abortBatchRequest() is called.
- //
- }
-
- public void finishBatchRequest(IceInternal.BasicStream os, boolean compress)
- {
- try
- {
- synchronized(this)
- {
- //
- // Get the batch stream back.
- //
- _batchStream.swap(os);
-
- if(_exception != null)
- {
- return;
- }
-
- boolean flush = false;
- if(_batchAutoFlush)
- {
- //
- // Throw memory limit exception if the first message added
- // causes us to go over limit. Otherwise put aside the
- // marshalled message that caused limit to be exceeded and
- // rollback stream to the marker.
- //
- try
- {
- _transceiver.checkSendSize(_batchStream.getBuffer(), _instance.messageSizeMax());
- }
- catch(Ice.LocalException ex)
- {
- if(_batchRequestNum > 0)
- {
- flush = true;
- }
- else
- {
- throw ex;
- }
- }
- }
-
- if(flush)
- {
- //
- // Temporarily save the last request.
- //
- byte[] lastRequest = new byte[_batchStream.size() - _batchMarker];
- IceInternal.Buffer buffer = _batchStream.getBuffer();
- buffer.b.position(_batchMarker);
- buffer.b.get(lastRequest);
- _batchStream.resize(_batchMarker, false);
-
- //
- // Send the batch stream without the last request.
- //
- try
- {
- //
- // Fill in the number of requests in the batch.
- //
- _batchStream.pos(IceInternal.Protocol.headerSize);
- _batchStream.writeInt(_batchRequestNum);
-
- OutgoingMessage message = new OutgoingMessage(_batchStream, _batchRequestCompress, true);
- sendMessage(message);
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert (_exception != null);
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
-
- //
- // Reset the batch stream.
- //
- _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
- _batchAutoFlush);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
-
- //
- // Check again if the last request doesn't exceed the
- // maximum message size.
- //
- if(IceInternal.Protocol.requestBatchHdr.length + lastRequest.length > _instance.messageSizeMax())
- {
- IceInternal.Ex.throwMemoryLimitException(IceInternal.Protocol.requestBatchHdr.length +
- lastRequest.length, _instance.messageSizeMax());
- }
-
- //
- // Start a new batch with the last message that caused us to
- // go over the limit.
- //
- _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr);
- _batchStream.writeBlob(lastRequest);
- }
-
- //
- // Increment the number of requests in the batch.
- //
- ++_batchRequestNum;
-
- //
- // We compress the whole batch if there is at least one
- // compressed
- // message.
- //
- if(compress)
- {
- _batchRequestCompress = true;
- }
-
- //
- // Notify about the batch stream not being in use anymore.
- //
- assert (_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
- }
- }
- catch(Ice.LocalException ex)
- {
- abortBatchRequest();
- throw ex;
- }
- }
-
- public synchronized void abortBatchRequest()
- {
- _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
- _batchAutoFlush);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
-
- assert (_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
- }
-
- @Override
- public void flushBatchRequests()
- {
- end_flushBatchRequests(begin_flushBatchRequests());
- }
-
- private static final String __flushBatchRequests_name = "flushBatchRequests";
-
- @Override
- public Ice.AsyncResult begin_flushBatchRequests()
- {
- return begin_flushBatchRequestsInternal(null);
- }
-
- @Override
- public Ice.AsyncResult begin_flushBatchRequests(Callback cb)
- {
- return begin_flushBatchRequestsInternal(cb);
- }
-
- @Override
- public Ice.AsyncResult begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb)
- {
- return begin_flushBatchRequestsInternal(cb);
- }
-
- @Override
- public AsyncResult begin_flushBatchRequests(IceInternal.Functional_VoidCallback __responseCb,
- IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
- IceInternal.Functional_BoolCallback __sentCb)
- {
- return begin_flushBatchRequestsInternal(new IceInternal.Functional_CallbackBase(false, __exceptionCb, __sentCb)
- {
- @Override
- public final void __completed(AsyncResult __result)
- {
- try
- {
- __result.getConnection().end_flushBatchRequests(__result);
- }
- catch(Exception __ex)
- {
- __exceptionCb.apply(__ex);
- }
- }
- });
- }
-
- private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb)
- {
- IceInternal.ConnectionFlushBatch result =
- new IceInternal.ConnectionFlushBatch(this, _communicator, _instance, __flushBatchRequests_name, cb);
- result.invoke();
- return result;
- }
-
- @Override
- public void end_flushBatchRequests(AsyncResult ir)
- {
- IceInternal.ConnectionFlushBatch r =
- IceInternal.ConnectionFlushBatch.check(ir, this, __flushBatchRequests_name);
- r.__wait();
- }
-
- synchronized public int flushAsyncBatchRequests(IceInternal.OutgoingAsyncBase outAsync)
- {
- waitBatchStreamInUse();
-
- if(_exception != null)
- {
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
-
- if(_batchRequestNum == 0)
- {
- int status = IceInternal.AsyncStatus.Sent;
- if(outAsync.sent())
- {
- status |= IceInternal.AsyncStatus.InvokeSentCallback;
- }
- return status;
- }
-
- //
- // Notify the request that it's cancelable with this connection.
- // This will throw if the request is canceled.
- //
- outAsync.cancelable(this);
-
- //
- // Fill in the number of requests in the batch.
- //
- _batchStream.pos(IceInternal.Protocol.headerSize);
- _batchStream.writeInt(_batchRequestNum);
-
- _batchStream.swap(outAsync.getOs());
-
- outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0);
-
- //
- // Send the batch stream.
- //
- int status;
- try
- {
- OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.getOs(), _batchRequestCompress, 0);
- status = sendMessage(message);
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert (_exception != null);
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
-
- //
- // Reset the batch stream.
- //
- _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
- _batchAutoFlush);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
- return status;
- }
-
- @Override
- synchronized public void setCallback(final ConnectionCallback callback)
- {
- synchronized(this)
- {
- if(_state >= StateClosed)
- {
- if(callback != null)
- {
- _threadPool.dispatch(new IceInternal.DispatchWorkItem(this)
- {
- @Override
- public void run()
- {
- try
- {
- callback.closed(ConnectionI.this);
- }
- catch(Exception ex)
- {
- _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
- }
- }
- });
- }
- }
- else
- {
- _callback = callback;
- }
- }
- }
-
- @Override
- synchronized public void setACM(Ice.IntOptional timeout, Ice.Optional<ACMClose> close,
- Ice.Optional<ACMHeartbeat> heartbeat)
- {
- if(_monitor == null || _state >= StateClosed)
- {
- return;
- }
-
- if(_state == StateActive)
- {
- _monitor.remove(this);
- }
- _monitor = _monitor.acm(timeout, close, heartbeat);
-
- if(_monitor.getACM().timeout <= 0)
- {
- _acmLastActivity = -1; // Disable the recording of last activity.
- }
- else if(_state == StateActive && _acmLastActivity == -1)
- {
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
- }
-
- if(_state == StateActive)
- {
- _monitor.add(this);
- }
- }
-
- @Override
- synchronized public Ice.ACM getACM()
- {
- return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
- }
-
- @Override
- synchronized public void asyncRequestCanceled(IceInternal.OutgoingAsyncBase outAsync, Ice.LocalException ex)
- {
- if(_state >= StateClosed)
- {
- return; // The request has already been or will be shortly notified of the failure.
- }
-
- java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
- while(it.hasNext())
- {
- OutgoingMessage o = it.next();
- if(o.outAsync == outAsync)
- {
- if(o.requestId > 0)
- {
- _asyncRequests.remove(o.requestId);
- }
-
- if(ex instanceof ConnectionTimeoutException)
- {
- setState(StateClosed, ex);
- }
- else
- {
- //
- // If the request is being sent, don't remove it from the send
- // streams, it will be removed once the sending is finished.
- //
- // Note that since we swapped the message stream to _writeStream
- // it's fine if the OutgoingAsync output stream is released (and
- // as long as canceled requests cannot be retried).
- //
- o.canceled();
- if(o != _sendStreams.getFirst())
- {
- it.remove();
- }
- if(outAsync.completed(ex))
- {
- outAsync.invokeCompletedAsync();
- }
- }
- return;
- }
- }
-
- if(outAsync instanceof IceInternal.OutgoingAsync)
- {
- IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync) outAsync;
- java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator();
- while(it2.hasNext())
- {
- if(it2.next() == o)
- {
- if(ex instanceof ConnectionTimeoutException)
- {
- setState(StateClosed, ex);
- }
- else
- {
- it2.remove();
- if(outAsync.completed(ex))
- {
- outAsync.invokeCompletedAsync();
- }
- }
- return;
- }
- }
- }
- }
-
- @Override
- synchronized public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag)
- {
- assert (_state > StateNotValidated);
-
- try
- {
- if(--_dispatchCount == 0)
- {
- if(_state == StateFinished)
- {
- reap();
- }
- notifyAll();
- }
-
- if(_state >= StateClosed)
- {
- assert (_exception != null);
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
-
- sendMessage(new OutgoingMessage(os, compressFlag != 0, true));
-
- if(_state == StateClosing && _dispatchCount == 0)
- {
- initiateShutdown();
- }
- }
- catch(LocalException ex)
- {
- setState(StateClosed, ex);
- }
- }
-
- @Override
- synchronized public void sendNoResponse()
- {
- assert (_state > StateNotValidated);
- try
- {
- if(--_dispatchCount == 0)
- {
- if(_state == StateFinished)
- {
- reap();
- }
- notifyAll();
- }
-
- if(_state >= StateClosed)
- {
- assert (_exception != null);
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
-
- if(_state == StateClosing && _dispatchCount == 0)
- {
- initiateShutdown();
- }
- }
- catch(LocalException ex)
- {
- setState(StateClosed, ex);
- }
- }
-
- @Override
- public boolean systemException(int requestId, Ice.SystemException ex)
- {
- return false; // System exceptions aren't marshalled.
- }
-
- public IceInternal.EndpointI endpoint()
- {
- return _endpoint; // No mutex protection necessary, _endpoint is
- // immutable.
- }
-
- public IceInternal.Connector connector()
- {
- return _connector; // No mutex protection necessary, _connector is
- // immutable.
- }
-
- @Override
- public synchronized void setAdapter(ObjectAdapter adapter)
- {
- if(_state <= StateNotValidated || _state >= StateClosing)
- {
- return;
- }
-
- _adapter = adapter;
-
- if(_adapter != null)
- {
- _servantManager = ((ObjectAdapterI) _adapter).getServantManager();
- if(_servantManager == null)
- {
- _adapter = null;
- }
- }
- else
- {
- _servantManager = null;
- }
-
- //
- // We never change the thread pool with which we were
- // initially registered, even if we add or remove an object
- // adapter.
- //
- }
-
- @Override
- public synchronized ObjectAdapter getAdapter()
- {
- return _adapter;
- }
-
- @Override
- public Endpoint getEndpoint()
- {
- return _endpoint; // No mutex protection necessary, _endpoint is
- // immutable.
- }
-
- @Override
- public ObjectPrx createProxy(Identity ident)
- {
- //
- // Create a reference and return a reverse proxy for this
- // reference.
- //
- return _instance.proxyFactory().referenceToProxy(_instance.referenceFactory().create(ident, this));
- }
-
- //
- // Operations from EventHandler
- //
- @Override
- public void message(IceInternal.ThreadPoolCurrent current)
- {
- StartCallback startCB = null;
- java.util.List<OutgoingMessage> sentCBs = null;
- MessageInfo info = null;
- int dispatchCount = 0;
-
- synchronized(this)
- {
- if(_state >= StateClosed)
- {
- return;
- }
-
- if(!current.ioReady())
- {
- return;
- }
-
- int readyOp = current.operation;
- try
- {
- unscheduleTimeout(current.operation);
-
- int writeOp = IceInternal.SocketOperation.None;
- int readOp = IceInternal.SocketOperation.None;
-
- if((readyOp & IceInternal.SocketOperation.Write) != 0)
- {
- final IceInternal.Buffer buf = _writeStream.getBuffer();
- if(_observer != null)
- {
- observerStartWrite(buf);
- }
- writeOp = write(buf);
- if(_observer != null && (writeOp & IceInternal.SocketOperation.Write) == 0)
- {
- observerFinishWrite(buf);
- }
- }
-
- while((readyOp & IceInternal.SocketOperation.Read) != 0)
- {
- final IceInternal.Buffer buf = _readStream.getBuffer();
- if(_observer != null && !_readHeader)
- {
- observerStartRead(buf);
- }
-
- readOp = read(buf);
- if((readOp & IceInternal.SocketOperation.Read) != 0)
- {
- break;
- }
- if(_observer != null && !_readHeader)
- {
- assert (!buf.b.hasRemaining());
- observerFinishRead(buf);
- }
-
- if(_readHeader) // Read header if necessary.
- {
- _readHeader = false;
-
- if(_observer != null)
- {
- _observer.receivedBytes(IceInternal.Protocol.headerSize);
- }
-
- int pos = _readStream.pos();
- if(pos < IceInternal.Protocol.headerSize)
- {
- //
- // This situation is possible for small UDP packets.
- //
- throw new Ice.IllegalMessageSizeException();
- }
-
- _readStream.pos(0);
- byte[] m = new byte[4];
- m[0] = _readStream.readByte();
- m[1] = _readStream.readByte();
- m[2] = _readStream.readByte();
- m[3] = _readStream.readByte();
- if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
- m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
- {
- Ice.BadMagicException ex = new Ice.BadMagicException();
- ex.badMagic = m;
- throw ex;
- }
-
- _readProtocol.__read(_readStream);
- IceInternal.Protocol.checkSupportedProtocol(_readProtocol);
-
- _readProtocolEncoding.__read(_readStream);
- IceInternal.Protocol.checkSupportedProtocolEncoding(_readProtocolEncoding);
-
- _readStream.readByte(); // messageType
- _readStream.readByte(); // compress
- int size = _readStream.readInt();
- if(size < IceInternal.Protocol.headerSize)
- {
- throw new Ice.IllegalMessageSizeException();
- }
- if(size > _instance.messageSizeMax())
- {
- IceInternal.Ex.throwMemoryLimitException(size, _instance.messageSizeMax());
- }
- if(size > _readStream.size())
- {
- _readStream.resize(size, true);
- }
- _readStream.pos(pos);
- }
-
- if(_readStream.pos() != _readStream.size())
- {
- if(_endpoint.datagram())
- {
- // The message was truncated.
- throw new Ice.DatagramLimitException();
- }
- continue;
- }
- break;
- }
-
- int newOp = readOp | writeOp;
- readyOp = readyOp & ~newOp;
- assert (readyOp != 0 || newOp != 0);
-
- if(_state <= StateNotValidated)
- {
- if(newOp != 0)
- {
- //
- // Wait for all the transceiver conditions to be
- // satisfied before continuing.
- //
- scheduleTimeout(newOp);
- _threadPool.update(this, current.operation, newOp);
- return;
- }
-
- if(_state == StateNotInitialized && !initialize(current.operation))
- {
- return;
- }
-
- if(_state <= StateNotValidated && !validate(current.operation))
- {
- return;
- }
-
- _threadPool.unregister(this, current.operation);
-
- //
- // We start out in holding state.
- //
- setState(StateHolding);
- if(_startCallback != null)
- {
- startCB = _startCallback;
- _startCallback = null;
- if(startCB != null)
- {
- ++dispatchCount;
- }
- }
- }
- else
- {
- assert (_state <= StateClosingPending);
-
- //
- // We parse messages first, if we receive a close
- // connection message we won't send more messages.
- //
- if((readyOp & IceInternal.SocketOperation.Read) != 0)
- {
- // Optimization: use the thread's stream.
- info = new MessageInfo(current.stream);
- newOp |= parseMessage(info);
- dispatchCount += info.messageDispatchCount;
- }
-
- if((readyOp & IceInternal.SocketOperation.Write) != 0)
- {
- sentCBs = new java.util.LinkedList<OutgoingMessage>();
- newOp |= sendNextMessage(sentCBs);
- if(!sentCBs.isEmpty())
- {
- ++dispatchCount;
- }
- else
- {
- sentCBs = null;
- }
- }
-
- if(_state < StateClosed)
- {
- scheduleTimeout(newOp);
- _threadPool.update(this, current.operation, newOp);
- }
- }
-
- if(_acmLastActivity > 0)
- {
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
- }
-
- if(dispatchCount == 0)
- {
- return; // Nothing to dispatch we're done!
- }
-
- _dispatchCount += dispatchCount;
- current.ioCompleted();
- }
- catch(DatagramLimitException ex) // Expected.
- {
- if(_warnUdp)
- {
- _logger.warning("maximum datagram size of " + _readStream.pos() + " exceeded");
- }
- _readStream.resize(IceInternal.Protocol.headerSize, true);
- _readStream.pos(0);
- _readHeader = true;
- return;
- }
- catch(SocketException ex)
- {
- setState(StateClosed, ex);
- return;
- }
- catch(LocalException ex)
- {
- if(_endpoint.datagram())
- {
- if(_warn)
- {
- String s = "datagram connection exception:\n" + ex + '\n' + _desc;
- _logger.warning(s);
- }
- _readStream.resize(IceInternal.Protocol.headerSize, true);
- _readStream.pos(0);
- _readHeader = true;
- }
- else
- {
- setState(StateClosed, ex);
- }
- return;
- }
- }
-
- if(!_dispatcher) // Optimization, call dispatch() directly if there's no
- // dispatcher.
- {
- dispatch(startCB, sentCBs, info);
- }
- else
- {
- // No need for the stream if heartbeat callback
- if(info != null && info.heartbeatCallback == null)
- {
- //
- // Create a new stream for the dispatch instead of using the
- // thread pool's thread stream.
- //
- assert (info.stream == current.stream);
- IceInternal.BasicStream stream = info.stream;
- info.stream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
- info.stream.swap(stream);
- }
-
- final StartCallback finalStartCB = startCB;
- final java.util.List<OutgoingMessage> finalSentCBs = sentCBs;
- final MessageInfo finalInfo = info;
- _threadPool.dispatchFromThisThread(new IceInternal.DispatchWorkItem(this)
- {
- @Override
- public void run()
- {
- dispatch(finalStartCB, finalSentCBs, finalInfo);
- }
- });
- }
- }
-
- protected void dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info)
- {
- int dispatchedCount = 0;
-
- //
- // Notify the factory that the connection establishment and
- // validation has completed.
- //
- if(startCB != null)
- {
- startCB.connectionStartCompleted(this);
- ++dispatchedCount;
- }
-
- //
- // Notify AMI calls that the message was sent.
- //
- if(sentCBs != null)
- {
- for(OutgoingMessage msg : sentCBs)
- {
- msg.outAsync.invokeSent();
- }
- ++dispatchedCount;
- }
-
- if(info != null)
- {
- //
- // Asynchronous replies must be handled outside the thread
- // synchronization, so that nested calls are possible.
- //
- if(info.outAsync != null)
- {
- info.outAsync.invokeCompleted();
- ++dispatchedCount;
- }
-
- if(info.heartbeatCallback != null)
- {
- try
- {
- info.heartbeatCallback.heartbeat(this);
- }
- catch(Exception ex)
- {
- _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
- }
- ++dispatchedCount;
- }
-
- //
- // Method invocation (or multiple invocations for batch messages)
- // must be done outside the thread synchronization, so that nested
- // calls are possible.
- //
- if(info.invokeNum > 0)
- {
- invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter);
-
- //
- // Don't increase dispatchedCount, the dispatch count is
- // decreased when the incoming reply is sent.
- //
- }
- }
-
- //
- // Decrease dispatch count.
- //
- if(dispatchedCount > 0)
- {
- synchronized(this)
- {
- _dispatchCount -= dispatchedCount;
- if(_dispatchCount == 0)
- {
- //
- // Only initiate shutdown if not already done. It might
- // have already been done if the sent callback or AMI
- // callback was dispatched when the connection was already
- // in the closing state.
- //
- if(_state == StateClosing)
- {
- try
- {
- initiateShutdown();
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- }
- }
- else if(_state == StateFinished)
- {
- reap();
- }
- notifyAll();
- }
- }
- }
- }
-
- @Override
- public void finished(IceInternal.ThreadPoolCurrent current, final boolean close)
- {
- synchronized(this)
- {
- assert (_state == StateClosed);
- unscheduleTimeout(IceInternal.SocketOperation.Read | IceInternal.SocketOperation.Write);
- }
-
- //
- // If there are no callbacks to call, we don't call ioCompleted() since
- // we're not going to call code that will potentially block (this avoids
- // promoting a new leader and unecessary thread creation, especially if
- // this is called on shutdown).
- //
- if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty() && _callback == null)
- {
- finish(close);
- return;
- }
-
- current.ioCompleted();
- if(!_dispatcher) // Optimization, call finish() directly if there's no
- // dispatcher.
- {
- finish(close);
- }
- else
- {
- _threadPool.dispatchFromThisThread(new IceInternal.DispatchWorkItem(this)
- {
- @Override
- public void run()
- {
- finish(close);
- }
- });
- }
- }
-
- public void finish(boolean close)
- {
- if(!_initialized)
- {
- if(_instance.traceLevels().network >= 2)
- {
- StringBuffer s = new StringBuffer("failed to ");
- s.append(_connector != null ? "establish" : "accept");
- s.append(" ");
- s.append(_endpoint.protocol());
- s.append(" connection\n");
- s.append(toString());
- s.append("\n");
- s.append(_exception);
- _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.toString());
- }
- }
- else
- {
- if(_instance.traceLevels().network >= 1)
- {
- StringBuffer s = new StringBuffer("closed ");
- s.append(_endpoint.protocol());
- s.append(" connection\n");
- s.append(toString());
- _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.toString());
- }
- }
-
- if(close)
- {
- _transceiver.close();
- }
-
- if(_startCallback != null)
- {
- _startCallback.connectionStartFailed(this, _exception);
- _startCallback = null;
- }
-
- if(!_sendStreams.isEmpty())
- {
- if(!_writeStream.isEmpty())
- {
- //
- // Return the stream to the outgoing call. This is important for
- // retriable AMI calls which are not marshalled again.
- //
- OutgoingMessage message = _sendStreams.getFirst();
- _writeStream.swap(message.stream);
- }
-
- for(OutgoingMessage p : _sendStreams)
- {
- p.completed(_exception);
- if(p.requestId > 0) // Make sure finished isn't called twice.
- {
- _asyncRequests.remove(p.requestId);
- }
- }
- _sendStreams.clear();
- }
-
- for(IceInternal.OutgoingAsync p : _asyncRequests.values())
- {
- if(p.completed(_exception))
- {
- p.invokeCompleted();
- }
- }
- _asyncRequests.clear();
-
- if(_callback != null)
- {
- try
- {
- _callback.closed(this);
- }
- catch(Exception ex)
- {
- _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
- }
- _callback = null;
- }
-
- //
- // This must be done last as this will cause waitUntilFinished() to
- // return (and communicator objects such as the timer might be destroyed
- // too).
- //
- synchronized(this)
- {
- setState(StateFinished);
-
- if(_dispatchCount == 0)
- {
- reap();
- }
- }
- }
-
- @Override
- public String toString()
- {
- return _toString();
- }
-
- @Override
- public java.nio.channels.SelectableChannel fd()
- {
- return _transceiver.fd();
- }
-
- public synchronized void timedOut()
- {
- if(_state <= StateNotValidated)
- {
- setState(StateClosed, new ConnectTimeoutException());
- }
- else if(_state < StateClosing)
- {
- setState(StateClosed, new TimeoutException());
- }
- else if(_state < StateClosed)
- {
- setState(StateClosed, new CloseTimeoutException());
- }
- }
-
- @Override
- public String type()
- {
- return _type; // No mutex lock, _type is immutable.
- }
-
- @Override
- public int timeout()
- {
- return _endpoint.timeout(); // No mutex protection necessary, _endpoint
- // is immutable.
- }
-
- @Override
- public synchronized ConnectionInfo getInfo()
- {
- if(_state >= StateClosed)
- {
- throw (Ice.LocalException) _exception.fillInStackTrace();
- }
- return initConnectionInfo();
- }
-
- @Override
- public String _toString()
- {
- return _desc; // No mutex lock, _desc is immutable.
- }
-
- public synchronized void exception(LocalException ex)
- {
- setState(StateClosed, ex);
- }
-
- @Override
- public synchronized void invokeException(int requestId, LocalException ex, int invokeNum)
- {
- //
- // Fatal exception while invoking a request. Since
- // sendResponse/sendNoResponse isn't
- // called in case of a fatal exception we decrement _dispatchCount here.
- //
-
- setState(StateClosed, ex);
-
- if(invokeNum > 0)
- {
- assert (_dispatchCount > 0);
- _dispatchCount -= invokeNum;
- assert (_dispatchCount >= 0);
- if(_dispatchCount == 0)
- {
- if(_state == StateFinished)
- {
- reap();
- }
- notifyAll();
- }
- }
- }
-
- public ConnectionI(Communicator communicator, IceInternal.Instance instance, IceInternal.ACMMonitor monitor,
- IceInternal.Transceiver transceiver, IceInternal.Connector connector, IceInternal.EndpointI endpoint,
- ObjectAdapter adapter)
- {
- _communicator = communicator;
- _instance = instance;
- _monitor = monitor;
- _transceiver = transceiver;
- _desc = transceiver.toString();
- _type = transceiver.protocol();
- _connector = connector;
- _endpoint = endpoint;
- _adapter = adapter;
- final Ice.InitializationData initData = instance.initializationData();
- // Cached for better performance.
- _dispatcher = initData.dispatcher != null;
- _logger = initData.logger; // Cached for better performance.
- _traceLevels = instance.traceLevels(); // Cached for better performance.
- _timer = instance.timer();
- _writeTimeout = new TimeoutCallback();
- _writeTimeoutFuture = null;
- _readTimeout = new TimeoutCallback();
- _readTimeoutFuture = null;
- _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0;
- _warnUdp = instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
- _cacheBuffers = instance.cacheMessageBuffers();
- if(_monitor != null && _monitor.getACM().timeout > 0)
- {
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
- }
- else
- {
- _acmLastActivity = -1;
- }
- _nextRequestId = 1;
- _batchAutoFlush = initData.properties.getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0 ? true : false;
- _batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding,
- _batchAutoFlush);
- _batchStreamInUse = false;
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
- _readStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding);
- _readHeader = false;
- _readStreamPos = -1;
- _writeStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding);
- _writeStreamPos = -1;
- _dispatchCount = 0;
- _state = StateNotInitialized;
-
- int compressionLevel = initData.properties.getPropertyAsIntWithDefault("Ice.Compression.Level", 1);
- if(compressionLevel < 1)
- {
- compressionLevel = 1;
- }
- else if(compressionLevel > 9)
- {
- compressionLevel = 9;
- }
- _compressionLevel = compressionLevel;
-
- if(_adapter != null)
- {
- _servantManager = ((ObjectAdapterI) _adapter).getServantManager();
- }
- else
- {
- _servantManager = null;
- }
-
- try
- {
- if(_adapter != null)
- {
- _threadPool = ((ObjectAdapterI) _adapter).getThreadPool();
- }
- else
- {
- _threadPool = _instance.clientThreadPool();
- }
- _threadPool.initialize(this);
- }
- catch(Ice.LocalException ex)
- {
- throw ex;
- }
- catch(java.lang.Exception ex)
- {
- throw new Ice.SyscallException(ex);
- }
- }
-
- @Override
- protected synchronized void finalize() throws Throwable
- {
- try
- {
- IceUtilInternal.Assert.FinalizerAssert(_startCallback == null);
- IceUtilInternal.Assert.FinalizerAssert(_state == StateFinished);
- IceUtilInternal.Assert.FinalizerAssert(_dispatchCount == 0);
- IceUtilInternal.Assert.FinalizerAssert(_sendStreams.isEmpty());
- IceUtilInternal.Assert.FinalizerAssert(_asyncRequests.isEmpty());
- }
- catch(java.lang.Exception ex)
- {
- }
- finally
- {
- super.finalize();
- }
- }
-
- private static final int StateNotInitialized = 0;
- private static final int StateNotValidated = 1;
- private static final int StateActive = 2;
- private static final int StateHolding = 3;
- private static final int StateClosing = 4;
- private static final int StateClosingPending = 5;
- private static final int StateClosed = 6;
- private static final int StateFinished = 7;
-
- private void setState(int state, LocalException ex)
- {
- //
- // If setState() is called with an exception, then only closed
- // and closing states are permissible.
- //
- assert state >= StateClosing;
-
- if(_state == state) // Don't switch twice.
- {
- return;
- }
-
- if(_exception == null)
- {
- //
- // If we are in closed state, an exception must be set.
- //
- assert (_state != StateClosed);
-
- _exception = ex;
-
- //
- // We don't warn if we are not validated.
- //
- if(_warn && _validated)
- {
- //
- // Don't warn about certain expected exceptions.
- //
- if(!(_exception instanceof CloseConnectionException ||
- _exception instanceof ForcedCloseConnectionException ||
- _exception instanceof ConnectionTimeoutException ||
- _exception instanceof CommunicatorDestroyedException ||
- _exception instanceof ObjectAdapterDeactivatedException ||
- (_exception instanceof ConnectionLostException && _state >= StateClosing)))
- {
- warning("connection exception", _exception);
- }
- }
- }
-
- //
- // We must set the new state before we notify requests of any
- // exceptions. Otherwise new requests may retry on a
- // connection that is not yet marked as closed or closing.
- //
- setState(state);
- }
-
- private void setState(int state)
- {
- //
- // We don't want to send close connection messages if the endpoint
- // only supports oneway transmission from client to server.
- //
- if(_endpoint.datagram() && state == StateClosing)
- {
- state = StateClosed;
- }
-
- //
- // Skip graceful shutdown if we are destroyed before validation.
- //
- if(_state <= StateNotValidated && state == StateClosing)
- {
- state = StateClosed;
- }
-
- if(_state == state) // Don't switch twice.
- {
- return;
- }
-
- try
- {
- switch(state)
- {
- case StateNotInitialized:
- {
- assert (false);
- break;
- }
-
- case StateNotValidated:
- {
- if(_state != StateNotInitialized)
- {
- assert (_state == StateClosed);
- return;
- }
- break;
- }
-
- case StateActive:
- {
- //
- // Can only switch from holding or not validated to
- // active.
- //
- if(_state != StateHolding && _state != StateNotValidated)
- {
- return;
- }
- _threadPool.register(this, IceInternal.SocketOperation.Read);
- break;
- }
-
- case StateHolding:
- {
- //
- // Can only switch from active or not validated to
- // holding.
- //
- if(_state != StateActive && _state != StateNotValidated)
- {
- return;
- }
- if(_state == StateActive)
- {
- _threadPool.unregister(this, IceInternal.SocketOperation.Read);
- }
- break;
- }
-
- case StateClosing:
- case StateClosingPending:
- {
- //
- // Can't change back from closing pending.
- //
- if(_state >= StateClosingPending)
- {
- return;
- }
- break;
- }
-
- case StateClosed:
- {
- if(_state == StateFinished)
- {
- return;
- }
-
- //
- // Don't need to close now for connections so only close the transceiver
- // if the selector request it.
- //
- if(_threadPool.finish(this, false))
- {
- _transceiver.close();
- }
- break;
- }
-
- case StateFinished:
- {
- assert (_state == StateClosed);
- _communicator = null;
- break;
- }
- }
- }
- catch(Ice.LocalException ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = "unexpected connection exception:\n " + _desc + "\n" + sw.toString();
- _instance.initializationData().logger.error(s);
- }
-
- //
- // We only register with the connection monitor if our new state
- // is StateActive. Otherwise we unregister with the connection
- // monitor, but only if we were registered before, i.e., if our
- // old state was StateActive.
- //
- if(_monitor != null)
- {
- if(state == StateActive)
- {
- if(_acmLastActivity > 0)
- {
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
- }
- _monitor.add(this);
- }
- else if(_state == StateActive)
- {
- _monitor.remove(this);
- }
- }
-
- if(_instance.initializationData().observer != null)
- {
- Ice.Instrumentation.ConnectionState oldState = toConnectionState(_state);
- Ice.Instrumentation.ConnectionState newState = toConnectionState(state);
- if(oldState != newState)
- {
- _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
- _endpoint,
- newState,
- _observer);
- if(_observer != null)
- {
- _observer.attach();
- }
- else
- {
- _writeStreamPos = -1;
- _readStreamPos = -1;
- }
- }
- if(_observer != null && state == StateClosed && _exception != null)
- {
- if(!(_exception instanceof CloseConnectionException ||
- _exception instanceof ForcedCloseConnectionException ||
- _exception instanceof ConnectionTimeoutException ||
- _exception instanceof CommunicatorDestroyedException ||
- _exception instanceof ObjectAdapterDeactivatedException ||
- (_exception instanceof ConnectionLostException && _state >= StateClosing)))
- {
- _observer.failed(_exception.ice_name());
- }
- }
- }
- _state = state;
-
- notifyAll();
-
- if(_state == StateClosing && _dispatchCount == 0)
- {
- try
- {
- initiateShutdown();
- }
- catch(LocalException ex)
- {
- setState(StateClosed, ex);
- }
- }
- }
-
- private void initiateShutdown()
- {
- assert (_state == StateClosing);
- assert (_dispatchCount == 0);
-
- if(_shutdownInitiated)
- {
- return;
- }
- _shutdownInitiated = true;
-
- if(!_endpoint.datagram())
- {
- //
- // Before we shut down, we send a close connection message.
- //
- IceInternal.BasicStream os = new IceInternal.BasicStream(_instance,
- IceInternal.Protocol.currentProtocolEncoding);
- os.writeBlob(IceInternal.Protocol.magic);
- IceInternal.Protocol.currentProtocol.__write(os);
- IceInternal.Protocol.currentProtocolEncoding.__write(os);
- os.writeByte(IceInternal.Protocol.closeConnectionMsg);
- os.writeByte((byte) 0); // compression status: always report 0 for
- // CloseConnection in Java.
- os.writeInt(IceInternal.Protocol.headerSize); // Message size.
-
- if((sendMessage(new OutgoingMessage(os, false, false)) & IceInternal.AsyncStatus.Sent) > 0)
- {
- setState(StateClosingPending);
-
- //
- // Notify the the transceiver of the graceful connection
- // closure.
- //
- int op = _transceiver.closing(true, _exception);
- if(op != 0)
- {
- scheduleTimeout(op);
- _threadPool.register(this, op);
- }
- }
- }
- }
-
- private void heartbeat()
- {
- assert (_state == StateActive);
-
- if(!_endpoint.datagram())
- {
- IceInternal.BasicStream os = new IceInternal.BasicStream(_instance,
- IceInternal.Protocol.currentProtocolEncoding);
- os.writeBlob(IceInternal.Protocol.magic);
- IceInternal.Protocol.currentProtocol.__write(os);
- IceInternal.Protocol.currentProtocolEncoding.__write(os);
- os.writeByte(IceInternal.Protocol.validateConnectionMsg);
- os.writeByte((byte) 0);
- os.writeInt(IceInternal.Protocol.headerSize); // Message size.
-
- try
- {
- OutgoingMessage message = new OutgoingMessage(os, false, false);
- sendMessage(message);
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert (_exception != null);
- }
- }
- }
-
- private boolean initialize(int operation)
- {
- int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), _hasMoreData);
- if(s != IceInternal.SocketOperation.None)
- {
- scheduleTimeout(s);
- _threadPool.update(this, operation, s);
- return false;
- }
-
- //
- // Update the connection description once the transceiver is
- // initialized.
- //
- _desc = _transceiver.toString();
- _initialized = true;
- setState(StateNotValidated);
-
- return true;
- }
-
- private boolean validate(int operation)
- {
- if(!_endpoint.datagram()) // Datagram connections are always implicitly
- // validated.
- {
- if(_adapter != null) // The server side has the active role for
- // connection validation.
- {
- if(_writeStream.isEmpty())
- {
- _writeStream.writeBlob(IceInternal.Protocol.magic);
- IceInternal.Protocol.currentProtocol.__write(_writeStream);
- IceInternal.Protocol.currentProtocolEncoding.__write(_writeStream);
- _writeStream.writeByte(IceInternal.Protocol.validateConnectionMsg);
- _writeStream.writeByte((byte) 0); // Compression status
- // (always zero for
- // validate connection).
- _writeStream.writeInt(IceInternal.Protocol.headerSize); // Message
- // size.
- IceInternal.TraceUtil.traceSend(_writeStream, _logger, _traceLevels);
- _writeStream.prepareWrite();
- }
-
- if(_observer != null)
- {
- observerStartWrite(_writeStream.getBuffer());
- }
-
- if(_writeStream.pos() != _writeStream.size())
- {
- int op = write(_writeStream.getBuffer());
- if(op != 0)
- {
- scheduleTimeout(op);
- _threadPool.update(this, operation, op);
- return false;
- }
- }
-
- if(_observer != null)
- {
- observerFinishWrite(_writeStream.getBuffer());
- }
- }
- else
- // The client side has the passive role for connection validation.
- {
- if(_readStream.isEmpty())
- {
- _readStream.resize(IceInternal.Protocol.headerSize, true);
- _readStream.pos(0);
- }
-
- if(_observer != null)
- {
- observerStartRead(_readStream.getBuffer());
- }
-
- if(_readStream.pos() != _readStream.size())
- {
- int op = read(_readStream.getBuffer());
- if(op != 0)
- {
- scheduleTimeout(op);
- _threadPool.update(this, operation, op);
- return false;
- }
- }
-
- if(_observer != null)
- {
- observerFinishRead(_readStream.getBuffer());
- }
-
- assert (_readStream.pos() == IceInternal.Protocol.headerSize);
- _readStream.pos(0);
- byte[] m = _readStream.readBlob(4);
- if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
- m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
- {
- BadMagicException ex = new BadMagicException();
- ex.badMagic = m;
- throw ex;
- }
-
- _readProtocol.__read(_readStream);
- IceInternal.Protocol.checkSupportedProtocol(_readProtocol);
-
- _readProtocolEncoding.__read(_readStream);
- IceInternal.Protocol.checkSupportedProtocolEncoding(_readProtocolEncoding);
-
- byte messageType = _readStream.readByte();
- if(messageType != IceInternal.Protocol.validateConnectionMsg)
- {
- throw new ConnectionNotValidatedException();
- }
- _readStream.readByte(); // Ignore compression status for
- // validate connection.
- int size = _readStream.readInt();
- if(size != IceInternal.Protocol.headerSize)
- {
- throw new IllegalMessageSizeException();
- }
- IceInternal.TraceUtil.traceRecv(_readStream, _logger, _traceLevels);
-
- _validated = true;
- }
- }
-
- _writeStream.resize(0, false);
- _writeStream.pos(0);
-
- _readStream.resize(IceInternal.Protocol.headerSize, true);
- _readStream.pos(0);
- _readHeader = true;
-
- if(_instance.traceLevels().network >= 1)
- {
- StringBuffer s = new StringBuffer();
- if(_endpoint.datagram())
- {
- s.append("starting to ");
- s.append(_connector != null ? "send" : "receive");
- s.append(" ");
- s.append(_endpoint.protocol());
- s.append(" messages\n");
- s.append(_transceiver.toDetailedString());
- }
- else
- {
- s.append(_connector != null ? "established" : "accepted");
- s.append(" ");
- s.append(_endpoint.protocol());
- s.append(" connection\n");
- s.append(toString());
- }
- _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.toString());
- }
-
- return true;
- }
-
- private int sendNextMessage(java.util.List<OutgoingMessage> callbacks)
- {
- if(_sendStreams.isEmpty())
- {
- return IceInternal.SocketOperation.None;
- }
- else if(_state == StateClosingPending && _writeStream.pos() == 0)
- {
- // Message wasn't sent, empty the _writeStream, we're not going to
- // send more data.
- OutgoingMessage message = _sendStreams.getFirst();
- _writeStream.swap(message.stream);
- return IceInternal.SocketOperation.None;
- }
-
- assert (!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
- try
- {
- while(true)
- {
- //
- // Notify the message that it was sent.
- //
- OutgoingMessage message = _sendStreams.getFirst();
- _writeStream.swap(message.stream);
- if(message.sent())
- {
- callbacks.add(message);
- }
- _sendStreams.removeFirst();
-
- //
- // If there's nothing left to send, we're done.
- //
- if(_sendStreams.isEmpty())
- {
- break;
- }
-
- //
- // If we are in the closed state or if the close is
- // pending, don't continue sending.
- //
- // This can occur if parseMessage (called before
- // sendNextMessage by message()) closes the connection.
- //
- if(_state >= StateClosingPending)
- {
- return IceInternal.SocketOperation.None;
- }
-
- //
- // Otherwise, prepare the next message stream for writing.
- //
- message = _sendStreams.getFirst();
- assert (!message.prepared);
- IceInternal.BasicStream stream = message.stream;
-
- message.stream = doCompress(stream, message.compress);
- message.stream.prepareWrite();
- message.prepared = true;
-
- if(message.outAsync != null)
- {
- IceInternal.TraceUtil.trace("sending asynchronous request", stream, _logger, _traceLevels);
- }
- else
- {
- IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels);
- }
- _writeStream.swap(message.stream);
-
- //
- // Send the message.
- //
- if(_observer != null)
- {
- observerStartWrite(_writeStream.getBuffer());
- }
- if(_writeStream.pos() != _writeStream.size())
- {
- int op = write(_writeStream.getBuffer());
- if(op != 0)
- {
- return op;
- }
- }
- if(_observer != null)
- {
- observerFinishWrite(_writeStream.getBuffer());
- }
- }
-
- //
- // If all the messages were sent and we are in the closing state, we
- // schedule the close timeout to wait for the peer to close the
- // connection.
- //
- if(_state == StateClosing && _shutdownInitiated)
- {
- setState(StateClosingPending);
- int op = _transceiver.closing(true, _exception);
- if(op != 0)
- {
- return op;
- }
- }
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- }
- return IceInternal.SocketOperation.None;
- }
-
- private int sendMessage(OutgoingMessage message)
- {
- assert (_state < StateClosed);
-
- if(!_sendStreams.isEmpty())
- {
- message.adopt();
- _sendStreams.addLast(message);
- return IceInternal.AsyncStatus.Queued;
- }
-
- //
- // Attempt to send the message without blocking. If the send blocks, we
- // register the connection with the selector thread.
- //
-
- assert (!message.prepared);
-
- IceInternal.BasicStream stream = message.stream;
-
- message.stream = doCompress(stream, message.compress);
- message.stream.prepareWrite();
- message.prepared = true;
- int op;
-
- if(message.outAsync != null)
- {
- IceInternal.TraceUtil.trace("sending asynchronous request", stream, _logger, _traceLevels);
- }
- else
- {
- IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels);
- }
-
- //
- // Send the message without blocking.
- //
- if(_observer != null)
- {
- observerStartWrite(message.stream.getBuffer());
- }
- op = write(message.stream.getBuffer());
- if(op == 0)
- {
- if(_observer != null)
- {
- observerFinishWrite(message.stream.getBuffer());
- }
-
- int status = IceInternal.AsyncStatus.Sent;
- if(message.sent())
- {
- status |= IceInternal.AsyncStatus.InvokeSentCallback;
- }
-
- if(_acmLastActivity > 0)
- {
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
- }
- return status;
- }
-
- message.adopt();
-
- _writeStream.swap(message.stream);
- _sendStreams.addLast(message);
- scheduleTimeout(op);
- _threadPool.register(this, op);
- return IceInternal.AsyncStatus.Queued;
- }
-
- private IceInternal.BasicStream doCompress(IceInternal.BasicStream uncompressed, boolean compress)
- {
- boolean compressionSupported = false;
- if(compress)
- {
- //
- // Don't check whether compression support is available unless the
- // proxy is configured for compression.
- //
- compressionSupported = IceInternal.BasicStream.compressible();
- }
-
- if(compressionSupported && uncompressed.size() >= 100)
- {
- //
- // Do compression.
- //
- IceInternal.BasicStream cstream = uncompressed.compress(IceInternal.Protocol.headerSize, _compressionLevel);
- if(cstream != null)
- {
- //
- // Set compression status.
- //
- cstream.pos(9);
- cstream.writeByte((byte) 2);
-
- //
- // Write the size of the compressed stream into the header.
- //
- cstream.pos(10);
- cstream.writeInt(cstream.size());
-
- //
- // Write the compression status and size of the compressed
- // stream into the header of the uncompressed stream -- we need
- // this to trace requests correctly.
- //
- uncompressed.pos(9);
- uncompressed.writeByte((byte) 2);
- uncompressed.writeInt(cstream.size());
-
- return cstream;
- }
- }
-
- uncompressed.pos(9);
- uncompressed.writeByte((byte) (compressionSupported ? 1 : 0));
-
- //
- // Not compressed, fill in the message size.
- //
- uncompressed.pos(10);
- uncompressed.writeInt(uncompressed.size());
-
- return uncompressed;
- }
-
- private static class MessageInfo
- {
- MessageInfo(IceInternal.BasicStream stream)
- {
- this.stream = stream;
- }
-
- IceInternal.BasicStream stream;
- int invokeNum;
- int requestId;
- byte compress;
- IceInternal.ServantManager servantManager;
- ObjectAdapter adapter;
- IceInternal.OutgoingAsync outAsync;
- ConnectionCallback heartbeatCallback;
- int messageDispatchCount;
- }
-
- private int parseMessage(MessageInfo info)
- {
- assert (_state > StateNotValidated && _state < StateClosed);
-
- _readStream.swap(info.stream);
- _readStream.resize(IceInternal.Protocol.headerSize, true);
- _readStream.pos(0);
- _readHeader = true;
-
- assert (info.stream.pos() == info.stream.size());
-
- //
- // Connection is validated on first message. This is only used by
- // setState() to check wether or not we can print a connection
- // warning (a client might close the connection forcefully if the
- // connection isn't validated).
- //
- _validated = true;
-
- try
- {
- //
- // We don't need to check magic and version here. This has already
- // been done by the ThreadPool which provides us with the stream.
- //
- info.stream.pos(8);
- byte messageType = info.stream.readByte();
- info.compress = info.stream.readByte();
- if(info.compress == (byte) 2)
- {
- if(IceInternal.BasicStream.compressible())
- {
- info.stream = info.stream.uncompress(IceInternal.Protocol.headerSize);
- }
- else
- {
- FeatureNotSupportedException ex = new FeatureNotSupportedException();
- ex.unsupportedFeature = "Cannot uncompress compressed message: "
- + "org.apache.tools.bzip2.CBZip2OutputStream was not found";
- throw ex;
- }
- }
- info.stream.pos(IceInternal.Protocol.headerSize);
-
- switch(messageType)
- {
- case IceInternal.Protocol.closeConnectionMsg:
- {
- IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
- if(_endpoint.datagram())
- {
- if(_warn)
- {
- _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
- }
- }
- else
- {
- setState(StateClosingPending, new CloseConnectionException());
-
- //
- // Notify the the transceiver of the graceful connection
- // closure.
- //
- int op = _transceiver.closing(false, _exception);
- if(op != 0)
- {
- return op;
- }
- setState(StateClosed);
- }
- break;
- }
-
- case IceInternal.Protocol.requestMsg:
- {
- if(_state >= StateClosing)
- {
- IceInternal.TraceUtil.trace("received request during closing\n"
- + "(ignored by server, client will retry)", info.stream, _logger,
- _traceLevels);
- }
- else
- {
- IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
- info.requestId = info.stream.readInt();
- info.invokeNum = 1;
- info.servantManager = _servantManager;
- info.adapter = _adapter;
- ++info.messageDispatchCount;
- }
- break;
- }
-
- case IceInternal.Protocol.requestBatchMsg:
- {
- if(_state >= StateClosing)
- {
- IceInternal.TraceUtil.trace("received batch request during closing\n"
- + "(ignored by server, client will retry)", info.stream, _logger,
- _traceLevels);
- }
- else
- {
- IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
- info.invokeNum = info.stream.readInt();
- if(info.invokeNum < 0)
- {
- info.invokeNum = 0;
- throw new UnmarshalOutOfBoundsException();
- }
- info.servantManager = _servantManager;
- info.adapter = _adapter;
- info.messageDispatchCount += info.invokeNum;
- }
- break;
- }
-
- case IceInternal.Protocol.replyMsg:
- {
-
- IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
- info.requestId = info.stream.readInt();
- IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId);
- if(outAsync != null && outAsync.completed(info.stream))
- {
- info.outAsync = outAsync;
- ++info.messageDispatchCount;
- }
- notifyAll(); // Notify threads blocked in close(false)
- break;
- }
-
- case IceInternal.Protocol.validateConnectionMsg:
- {
- IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
- if(_callback != null)
- {
- info.heartbeatCallback = _callback;
- ++info.messageDispatchCount;
- }
- break;
- }
-
- default:
- {
- IceInternal.TraceUtil.trace("received unknown message\n(invalid, closing connection)", info.stream,
- _logger, _traceLevels);
- throw new UnknownMessageException();
- }
- }
- }
- catch(LocalException ex)
- {
- if(_endpoint.datagram())
- {
- if(_warn)
- {
- _logger.warning("datagram connection exception:\n" + ex + '\n' + _desc);
- }
- }
- else
- {
- setState(StateClosed, ex);
- }
- }
-
- return _state == StateHolding ? IceInternal.SocketOperation.None : IceInternal.SocketOperation.Read;
- }
-
- private void invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress,
- IceInternal.ServantManager servantManager, ObjectAdapter adapter)
- {
- //
- // Note: In contrast to other private or protected methods, this
- // operation must be called *without* the mutex locked.
- //
-
- IceInternal.Incoming in = null;
- try
- {
- while(invokeNum > 0)
- {
-
- //
- // Prepare the invocation.
- //
- boolean response = !_endpoint.datagram() && requestId != 0;
- in = getIncoming(adapter, response, compress, requestId);
-
- //
- // Dispatch the invocation.
- //
- in.invoke(servantManager, stream);
-
- --invokeNum;
-
- reclaimIncoming(in);
- in = null;
- }
-
- stream.clear();
- }
- catch(LocalException ex)
- {
- invokeException(requestId, ex, invokeNum);
- }
- catch(java.lang.AssertionError ex) // Upon assertion, we print the stack
- // trace.
- {
- UnknownException uex = new UnknownException(ex);
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- uex.unknown = sw.toString();
- _logger.error(uex.unknown);
- invokeException(requestId, uex, invokeNum);
- }
- catch(java.lang.OutOfMemoryError ex)
- {
- UnknownException uex = new UnknownException(ex);
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- uex.unknown = sw.toString();
- _logger.error(uex.unknown);
- invokeException(requestId, uex, invokeNum);
- }
- finally
- {
- if(in != null)
- {
- reclaimIncoming(in);
- }
- }
- }
-
- private void scheduleTimeout(int status)
- {
- int timeout;
- if(_state < StateActive)
- {
- IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideConnectTimeout)
- {
- timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
- }
- else
- {
- timeout = _endpoint.timeout();
- }
- }
- else if(_state < StateClosingPending)
- {
- if(_readHeader) // No timeout for reading the header.
- {
- status &= ~IceInternal.SocketOperation.Read;
- }
- timeout = _endpoint.timeout();
- }
- else
- {
- IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideCloseTimeout)
- {
- timeout = defaultsAndOverrides.overrideCloseTimeoutValue;
- }
- else
- {
- timeout = _endpoint.timeout();
- }
- }
-
- if(timeout < 0)
- {
- return;
- }
-
- try
- {
- if((status & IceInternal.SocketOperation.Read) != 0)
- {
- if(_readTimeoutFuture != null)
- {
- _readTimeoutFuture.cancel(false);
- }
- _readTimeoutFuture = _timer.schedule(_readTimeout, timeout, java.util.concurrent.TimeUnit.MILLISECONDS);
- }
- if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0)
- {
- if(_writeTimeoutFuture != null)
- {
- _writeTimeoutFuture.cancel(false);
- }
- _writeTimeoutFuture = _timer.schedule(_writeTimeout, timeout,
- java.util.concurrent.TimeUnit.MILLISECONDS);
- }
- }
- catch(Throwable ex)
- {
- assert (false);
- }
- }
-
- private void unscheduleTimeout(int status)
- {
- if((status & IceInternal.SocketOperation.Read) != 0 && _readTimeoutFuture != null)
- {
- _readTimeoutFuture.cancel(false);
- _readTimeoutFuture = null;
- }
- if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0 &&
- _writeTimeoutFuture != null)
- {
- _writeTimeoutFuture.cancel(false);
- _writeTimeoutFuture = null;
- }
- }
-
- private ConnectionInfo initConnectionInfo()
- {
- if(_info != null)
- {
- return _info;
- }
-
- ConnectionInfo info = _transceiver.getInfo();
- info.connectionId = _endpoint.connectionId();
- info.adapterName = _adapter != null ? _adapter.getName() : "";
- info.incoming = _connector == null;
- if(_state > StateNotInitialized)
- {
- _info = info; // Cache the connection information only if
- // initialized.
- }
- return info;
- }
-
- private Ice.Instrumentation.ConnectionState toConnectionState(int state)
- {
- return connectionStateMap[state];
- }
-
- private void warning(String msg, java.lang.Exception ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- String s = msg + ":\n" + _desc + "\n" + sw.toString();
- _logger.warning(s);
- }
-
- private void observerStartRead(IceInternal.Buffer buf)
- {
- if(_readStreamPos >= 0)
- {
- assert (!buf.empty());
- _observer.receivedBytes(buf.b.position() - _readStreamPos);
- }
- _readStreamPos = buf.empty() ? -1 : buf.b.position();
- }
-
- private void observerFinishRead(IceInternal.Buffer buf)
- {
- if(_readStreamPos == -1)
- {
- return;
- }
- assert (buf.b.position() >= _readStreamPos);
- _observer.receivedBytes(buf.b.position() - _readStreamPos);
- _readStreamPos = -1;
- }
-
- private void observerStartWrite(IceInternal.Buffer buf)
- {
- if(_writeStreamPos >= 0)
- {
- assert (!buf.empty());
- _observer.sentBytes(buf.b.position() - _writeStreamPos);
- }
- _writeStreamPos = buf.empty() ? -1 : buf.b.position();
- }
-
- private void observerFinishWrite(IceInternal.Buffer buf)
- {
- if(_writeStreamPos == -1)
- {
- return;
- }
- if(buf.b.position() > _writeStreamPos)
- {
- _observer.sentBytes(buf.b.position() - _writeStreamPos);
- }
- _writeStreamPos = -1;
- }
-
- private IceInternal.Incoming getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId)
- {
- IceInternal.Incoming in = null;
-
- if(_cacheBuffers > 0)
- {
- synchronized(_incomingCacheMutex)
- {
- if(_incomingCache == null)
- {
- in = new IceInternal.Incoming(_instance, this, this, adapter, response, compress, requestId);
- }
- else
- {
- in = _incomingCache;
- _incomingCache = _incomingCache.next;
- in.reset(_instance, this, this, adapter, response, compress, requestId);
- in.next = null;
- }
- }
- }
- else
- {
- in = new IceInternal.Incoming(_instance, this, this, adapter, response, compress, requestId);
- }
-
- return in;
- }
-
- private void reclaimIncoming(IceInternal.Incoming in)
- {
- if(_cacheBuffers > 0)
- {
- synchronized(_incomingCacheMutex)
- {
- in.next = _incomingCache;
- _incomingCache = in;
- //
- // Clear references to Ice objects as soon as possible.
- //
- _incomingCache.reclaim();
- }
- }
- }
-
- private void reap()
- {
- if(_monitor != null)
- {
- _monitor.reap(this);
- }
- if(_observer != null)
- {
- _observer.detach();
- }
- }
-
- private void waitBatchStreamInUse()
- {
- //
- // This is similar to a mutex lock in that the flag is
- // only true for a short time period. As such we don't permit the
- // wait to be interrupted. Instead the interrupted status is saved
- // and restored.
- //
- boolean interrupted = false;
- while(_batchStreamInUse && _exception == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException e)
- {
- interrupted = true;
- }
- }
- //
- // Restore the interrupted flag if we were interrupted.
- //
- if(interrupted)
- {
- Thread.currentThread().interrupt();
- }
- }
-
- private int read(IceInternal.Buffer buf)
- {
- int start = buf.b.position();
- int op = _transceiver.read(buf, _hasMoreData);
- if(_instance.traceLevels().network >= 3 && buf.b.position() != start)
- {
- StringBuffer s = new StringBuffer("received ");
- if(_endpoint.datagram())
- {
- s.append(buf.b.limit());
- }
- else
- {
- s.append(buf.b.position() - start);
- s.append(" of ");
- s.append(buf.b.limit() - start);
- }
- s.append(" bytes via ");
- s.append(_endpoint.protocol());
- s.append("\n");
- s.append(toString());
- _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.toString());
- }
- return op;
- }
-
- private int write(IceInternal.Buffer buf)
- {
- int start = buf.b.position();
- int op = _transceiver.write(buf);
- if(_instance.traceLevels().network >= 3 && buf.b.position() != start)
- {
- StringBuffer s = new StringBuffer("sent ");
- s.append(buf.b.position() - start);
- if(!_endpoint.datagram())
- {
- s.append(" of ");
- s.append(buf.b.limit() - start);
- }
- s.append(" bytes via ");
- s.append(_endpoint.protocol());
- s.append("\n");
- s.append(toString());
- _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.toString());
- }
- return op;
- }
-
- private static class OutgoingMessage
- {
- OutgoingMessage(IceInternal.BasicStream stream, boolean compress, boolean adopt)
- {
- this.stream = stream;
- this.compress = compress;
- this.adopt = adopt;
- this.requestId = 0;
- }
-
- OutgoingMessage(IceInternal.OutgoingAsyncBase out, IceInternal.BasicStream stream, boolean compress,
- int requestId)
- {
- this.stream = stream;
- this.compress = compress;
- this.outAsync = out;
- this.requestId = requestId;
- }
-
- public void canceled()
- {
- assert (outAsync != null);
- outAsync = null;
- }
-
- public void adopt()
- {
- if(adopt)
- {
- IceInternal.BasicStream stream = new IceInternal.BasicStream(this.stream.instance(),
- IceInternal.Protocol.currentProtocolEncoding);
- stream.swap(this.stream);
- this.stream = stream;
- adopt = false;
- }
- }
-
- public boolean sent()
- {
- if(outAsync != null)
- {
- return outAsync.sent();
- }
- return false;
- }
-
- public void completed(Ice.LocalException ex)
- {
- if(outAsync != null && outAsync.completed(ex))
- {
- outAsync.invokeCompleted();
- }
- }
-
- public IceInternal.BasicStream stream;
- public IceInternal.OutgoingAsyncBase outAsync;
- public boolean compress;
- public int requestId;
- boolean adopt;
- boolean prepared;
- }
-
- private Communicator _communicator;
- private final IceInternal.Instance _instance;
- private IceInternal.ACMMonitor _monitor;
- private final IceInternal.Transceiver _transceiver;
- private String _desc;
- private final String _type;
- private final IceInternal.Connector _connector;
- private final IceInternal.EndpointI _endpoint;
-
- private ObjectAdapter _adapter;
- private IceInternal.ServantManager _servantManager;
-
- private final boolean _dispatcher;
- private final Logger _logger;
- private final IceInternal.TraceLevels _traceLevels;
- private final IceInternal.ThreadPool _threadPool;
-
- private final java.util.concurrent.ScheduledExecutorService _timer;
- private final Runnable _writeTimeout;
- private java.util.concurrent.Future<?> _writeTimeoutFuture;
- private final Runnable _readTimeout;
- private java.util.concurrent.Future<?> _readTimeoutFuture;
-
- private StartCallback _startCallback = null;
-
- private final boolean _warn;
- private final boolean _warnUdp;
-
- private long _acmLastActivity;
-
- private final int _compressionLevel;
-
- private int _nextRequestId;
-
- private java.util.Map<Integer, IceInternal.OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, IceInternal.OutgoingAsync>();
-
- private LocalException _exception;
-
- private boolean _batchAutoFlush;
- private IceInternal.BasicStream _batchStream;
- private boolean _batchStreamInUse;
- private int _batchRequestNum;
- private boolean _batchRequestCompress;
- private int _batchMarker;
-
- private java.util.LinkedList<OutgoingMessage> _sendStreams = new java.util.LinkedList<OutgoingMessage>();
-
- private IceInternal.BasicStream _readStream;
- private boolean _readHeader;
- private IceInternal.BasicStream _writeStream;
-
- private Ice.Instrumentation.ConnectionObserver _observer;
- private int _readStreamPos;
- private int _writeStreamPos;
-
- private int _dispatchCount;
-
- private int _state; // The current state.
- private boolean _shutdownInitiated = false;
- private boolean _initialized = false;
- private boolean _validated = false;
-
- private IceInternal.Incoming _incomingCache;
- private java.lang.Object _incomingCacheMutex = new java.lang.Object();
-
- private Ice.ProtocolVersion _readProtocol = new Ice.ProtocolVersion();
- private Ice.EncodingVersion _readProtocolEncoding = new Ice.EncodingVersion();
-
- private int _cacheBuffers;
-
- private Ice.ConnectionInfo _info;
-
- private ConnectionCallback _callback;
-
- private static Ice.Instrumentation.ConnectionState connectionStateMap[] = {
- Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized
- Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated
- Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive
- Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding
- Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing
- Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosingPending
- Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed
- Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished
- };
-
-}