summaryrefslogtreecommitdiff
path: root/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java
diff options
context:
space:
mode:
Diffstat (limited to 'java-compat/src/Ice/src/main/java/Ice/ConnectionI.java')
-rw-r--r--java-compat/src/Ice/src/main/java/Ice/ConnectionI.java3030
1 files changed, 3030 insertions, 0 deletions
diff --git a/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java b/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java
new file mode 100644
index 00000000000..cac2a6fb3d2
--- /dev/null
+++ b/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java
@@ -0,0 +1,3030 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package Ice;
+
+import java.util.concurrent.Callable;
+
+public final class ConnectionI extends IceInternal.EventHandler
+ implements Connection, IceInternal.ResponseHandler, IceInternal.CancellationHandler
+{
+ public interface StartCallback
+ {
+ void connectionStartCompleted(ConnectionI connection);
+
+ void connectionStartFailed(ConnectionI connection, Ice.LocalException ex);
+ }
+
+ private class TimeoutCallback implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ timedOut();
+ }
+ }
+
+ public void start(StartCallback callback)
+ {
+ try
+ {
+ synchronized(this)
+ {
+ // The connection might already be closed if the communicator
+ // was destroyed.
+ if(_state >= StateClosed)
+ {
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
+ }
+
+ if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
+ {
+ _startCallback = callback;
+ return;
+ }
+
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ exception(ex);
+ callback.connectionStartFailed(this, _exception);
+ return;
+ }
+
+ callback.connectionStartCompleted(this);
+ }
+
+ public void startAndWait() throws InterruptedException
+ {
+ try
+ {
+ synchronized(this)
+ {
+ // The connection might already be closed if the communicator
+ // was destroyed.
+ if(_state >= StateClosed)
+ {
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
+ }
+
+ if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
+ {
+ while(_state <= StateNotValidated)
+ {
+ wait();
+ }
+
+ if(_state >= StateClosing)
+ {
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
+ }
+ }
+
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ exception(ex);
+ waitUntilFinished();
+ }
+ }
+
+ public synchronized void activate()
+ {
+ if(_state <= StateNotValidated)
+ {
+ return;
+ }
+
+ if(_acmLastActivity > 0)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
+
+ setState(StateActive);
+ }
+
+ public synchronized void hold()
+ {
+ if(_state <= StateNotValidated)
+ {
+ return;
+ }
+
+ setState(StateHolding);
+ }
+
+ // DestructionReason.
+ public final static int ObjectAdapterDeactivated = 0;
+ public final static int CommunicatorDestroyed = 1;
+
+ synchronized public void destroy(int reason)
+ {
+ switch(reason)
+ {
+ case ObjectAdapterDeactivated:
+ {
+ setState(StateClosing, new ObjectAdapterDeactivatedException());
+ break;
+ }
+
+ case CommunicatorDestroyed:
+ {
+ setState(StateClosing, new CommunicatorDestroyedException());
+ break;
+ }
+ }
+ }
+
+ @Override
+ synchronized public void close(boolean force)
+ {
+ if(Thread.interrupted())
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+
+ if(force)
+ {
+ setState(StateClosed, new ForcedCloseConnectionException());
+ }
+ else
+ {
+ //
+ // If we do a graceful shutdown, then we wait until all
+ // outstanding requests have been completed. Otherwise,
+ // the CloseConnectionException will cause all outstanding
+ // requests to be retried, regardless of whether the
+ // server has processed them or not.
+ //
+ while(!_asyncRequests.isEmpty())
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ }
+
+ setState(StateClosing, new CloseConnectionException());
+ }
+ }
+
+ public synchronized boolean isActiveOrHolding()
+ {
+ return _state > StateNotValidated && _state < StateClosing;
+ }
+
+ public synchronized boolean isFinished()
+ {
+ if(_state != StateFinished || _dispatchCount != 0)
+ {
+ return false;
+ }
+
+ assert (_state == StateFinished);
+ return true;
+ }
+
+ public synchronized void throwException()
+ {
+ if(_exception != null)
+ {
+ assert (_state >= StateClosing);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
+ }
+ }
+
+ public synchronized void waitUntilHolding() throws InterruptedException
+ {
+ while(_state < StateHolding || _dispatchCount > 0)
+ {
+ wait();
+ }
+ }
+
+ public synchronized void waitUntilFinished() throws InterruptedException
+ {
+ //
+ // We wait indefinitely until the connection is finished and all
+ // outstanding requests are completed. Otherwise we couldn't
+ // guarantee that there are no outstanding calls when deactivate()
+ // is called on the servant locators.
+ //
+ while(_state < StateFinished || _dispatchCount > 0)
+ {
+ wait();
+ }
+
+ assert (_state == StateFinished);
+
+ //
+ // Clear the OA. See bug 1673 for the details of why this is necessary.
+ //
+ _adapter = null;
+ }
+
+ synchronized public void updateObserver()
+ {
+ if(_state < StateNotValidated || _state > StateClosed)
+ {
+ return;
+ }
+
+ assert (_instance.initializationData().observer != null);
+ _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ toConnectionState(_state),
+ _observer);
+ if(_observer != null)
+ {
+ _observer.attach();
+ }
+ else
+ {
+ _writeStreamPos = -1;
+ _readStreamPos = -1;
+ }
+ }
+
+ synchronized public void monitor(long now, IceInternal.ACMConfig acm)
+ {
+ if(_state != StateActive)
+ {
+ return;
+ }
+
+ //
+ // We send a heartbeat if there was no activity in the last
+ // (timeout / 4) period. Sending a heartbeat sooner than
+ // really needed is safer to ensure that the receiver will
+ // receive in time the heartbeat. Sending the heartbeat if
+ // there was no activity in the last (timeout / 2) period
+ // isn't enough since monitor() is called only every (timeout
+ // / 2) period.
+ //
+ // Note that this doesn't imply that we are sending 4
+ // heartbeats per timeout period because the monitor() method
+ // is sill only called every (timeout / 2) period.
+ //
+ if(acm.heartbeat == ACMHeartbeat.HeartbeatAlways ||
+ (acm.heartbeat != ACMHeartbeat.HeartbeatOff && _writeStream.isEmpty() &&
+ now >= (_acmLastActivity + acm.timeout / 4)))
+ {
+ if(acm.heartbeat != ACMHeartbeat.HeartbeatOnInvocation || _dispatchCount > 0)
+ {
+ heartbeat();
+ }
+ }
+
+ if(_readStream.size() > IceInternal.Protocol.headerSize || !_writeStream.isEmpty())
+ {
+ //
+ // If writing or reading, nothing to do, the connection
+ // timeout will kick-in if writes or reads don't progress.
+ // This check is necessary because the activity timer is
+ // only set when a message is fully read/written.
+ //
+ return;
+ }
+
+ if(acm.close != ACMClose.CloseOff && now >= (_acmLastActivity + acm.timeout))
+ {
+ if(acm.close == ACMClose.CloseOnIdleForceful ||
+ (acm.close != ACMClose.CloseOnIdle && (!_asyncRequests.isEmpty())))
+ {
+ //
+ // Close the connection if we didn't receive a heartbeat in
+ // the last period.
+ //
+ setState(StateClosed, new ConnectionTimeoutException());
+ }
+ else if(acm.close != ACMClose.CloseOnInvocation && _dispatchCount == 0 && _batchRequestQueue.isEmpty() &&
+ _asyncRequests.isEmpty())
+ {
+ //
+ // The connection is idle, close it.
+ //
+ setState(StateClosing, new ConnectionTimeoutException());
+ }
+ }
+ }
+
+ synchronized public int
+ sendAsyncRequest(IceInternal.OutgoingAsyncBase out, boolean compress, boolean response, int batchRequestNum)
+ throws IceInternal.RetryException
+ {
+ final OutputStream os = out.getOs();
+
+ if(_exception != null)
+ {
+ //
+ // If the connection is closed before we even have a chance
+ // to send our request, we always try to send the request
+ // again.
+ //
+ throw new IceInternal.RetryException((Ice.LocalException) _exception.fillInStackTrace());
+ }
+
+ assert (_state > StateNotValidated);
+ assert (_state < StateClosing);
+
+ //
+ // Ensure the message isn't bigger than what we can send with the
+ // transport.
+ //
+ _transceiver.checkSendSize(os.getBuffer());
+
+ //
+ // Notify the request that it's cancelable with this connection.
+ // This will throw if the request is canceled.
+ //
+ out.cancelable(this);
+
+ int requestId = 0;
+ if(response)
+ {
+ //
+ // Create a new unique request ID.
+ //
+ requestId = _nextRequestId++;
+ if(requestId <= 0)
+ {
+ _nextRequestId = 1;
+ requestId = _nextRequestId++;
+ }
+
+ //
+ // Fill in the request ID.
+ //
+ os.pos(IceInternal.Protocol.headerSize);
+ os.writeInt(requestId);
+ }
+ else if(batchRequestNum > 0)
+ {
+ os.pos(IceInternal.Protocol.headerSize);
+ os.writeInt(batchRequestNum);
+ }
+
+ out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
+
+ int status;
+ try
+ {
+ status = sendMessage(new OutgoingMessage(out, os, compress, requestId));
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
+ }
+
+ if(response)
+ {
+ //
+ // Add to the async requests map.
+ //
+ _asyncRequests.put(requestId, out);
+ }
+ return status;
+ }
+
+ public IceInternal.BatchRequestQueue
+ getBatchRequestQueue()
+ {
+ return _batchRequestQueue;
+ }
+
+ @Override
+ public void flushBatchRequests()
+ {
+ end_flushBatchRequests(begin_flushBatchRequests());
+ }
+
+ private static final String __flushBatchRequests_name = "flushBatchRequests";
+
+ @Override
+ public Ice.AsyncResult begin_flushBatchRequests()
+ {
+ return begin_flushBatchRequestsInternal(null);
+ }
+
+ @Override
+ public Ice.AsyncResult begin_flushBatchRequests(Callback cb)
+ {
+ return begin_flushBatchRequestsInternal(cb);
+ }
+
+ @Override
+ public Ice.AsyncResult begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb)
+ {
+ return begin_flushBatchRequestsInternal(cb);
+ }
+
+ @Override
+ public AsyncResult begin_flushBatchRequests(IceInternal.Functional_VoidCallback __responseCb,
+ IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
+ IceInternal.Functional_BoolCallback __sentCb)
+ {
+ return begin_flushBatchRequestsInternal(new IceInternal.Functional_CallbackBase(false, __exceptionCb, __sentCb)
+ {
+ @Override
+ public final void __completed(AsyncResult __result)
+ {
+ try
+ {
+ __result.getConnection().end_flushBatchRequests(__result);
+ }
+ catch(Exception __ex)
+ {
+ __exceptionCb.apply(__ex);
+ }
+ }
+ });
+ }
+
+ private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb)
+ {
+ IceInternal.ConnectionFlushBatch result =
+ new IceInternal.ConnectionFlushBatch(this, _communicator, _instance, __flushBatchRequests_name, cb);
+ result.invoke();
+ return result;
+ }
+
+ @Override
+ public void end_flushBatchRequests(AsyncResult ir)
+ {
+ IceInternal.ConnectionFlushBatch r =
+ IceInternal.ConnectionFlushBatch.check(ir, this, __flushBatchRequests_name);
+ r.__wait();
+ }
+
+ @Override
+ synchronized public void setCloseCallback(final CloseCallback callback)
+ {
+ if(_state >= StateClosed)
+ {
+ if(callback != null)
+ {
+ _threadPool.dispatch(new IceInternal.DispatchWorkItem(this)
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ callback.closed(ConnectionI.this);
+ }
+ catch(Exception ex)
+ {
+ _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
+ }
+ }
+ });
+ }
+ }
+ else
+ {
+ _closeCallback = callback;
+ }
+ }
+
+ @Override
+ synchronized public void setHeartbeatCallback(final HeartbeatCallback callback)
+ {
+ _heartbeatCallback = callback;
+ }
+
+ @Override
+ synchronized public void setACM(Ice.IntOptional timeout, Ice.Optional<ACMClose> close,
+ Ice.Optional<ACMHeartbeat> heartbeat)
+ {
+ if(_monitor == null || _state >= StateClosed)
+ {
+ return;
+ }
+
+ if(_state == StateActive)
+ {
+ _monitor.remove(this);
+ }
+ _monitor = _monitor.acm(timeout, close, heartbeat);
+
+ if(_monitor.getACM().timeout <= 0)
+ {
+ _acmLastActivity = -1; // Disable the recording of last activity.
+ }
+ else if(_state == StateActive && _acmLastActivity == -1)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
+
+ if(_state == StateActive)
+ {
+ _monitor.add(this);
+ }
+ }
+
+ @Override
+ synchronized public Ice.ACM getACM()
+ {
+ return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
+ }
+
+ @Override
+ synchronized public void asyncRequestCanceled(IceInternal.OutgoingAsyncBase outAsync, Ice.LocalException ex)
+ {
+ if(_state >= StateClosed)
+ {
+ return; // The request has already been or will be shortly notified of the failure.
+ }
+
+ java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
+ while(it.hasNext())
+ {
+ OutgoingMessage o = it.next();
+ if(o.outAsync == outAsync)
+ {
+ if(o.requestId > 0)
+ {
+ _asyncRequests.remove(o.requestId);
+ }
+
+ if(ex instanceof ConnectionTimeoutException)
+ {
+ setState(StateClosed, ex);
+ }
+ else
+ {
+ //
+ // If the request is being sent, don't remove it from the send
+ // streams, it will be removed once the sending is finished.
+ //
+ // Note that since we swapped the message stream to _writeStream
+ // it's fine if the OutgoingAsync output stream is released (and
+ // as long as canceled requests cannot be retried).
+ //
+ o.canceled();
+ if(o != _sendStreams.getFirst())
+ {
+ it.remove();
+ }
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ }
+ return;
+ }
+ }
+
+ if(outAsync instanceof IceInternal.OutgoingAsync)
+ {
+ IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync) outAsync;
+ java.util.Iterator<IceInternal.OutgoingAsyncBase> it2 = _asyncRequests.values().iterator();
+ while(it2.hasNext())
+ {
+ if(it2.next() == o)
+ {
+ if(ex instanceof ConnectionTimeoutException)
+ {
+ setState(StateClosed, ex);
+ }
+ else
+ {
+ it2.remove();
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ }
+ return;
+ }
+ }
+ }
+ }
+
+ @Override
+ synchronized public void sendResponse(int requestId, OutputStream os, byte compressFlag, boolean amd)
+ {
+ assert (_state > StateNotValidated);
+
+ try
+ {
+ if(--_dispatchCount == 0)
+ {
+ if(_state == StateFinished)
+ {
+ reap();
+ }
+ notifyAll();
+ }
+
+ if(_state >= StateClosed)
+ {
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
+ }
+
+ sendMessage(new OutgoingMessage(os, compressFlag != 0, true));
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+ }
+ catch(LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
+ }
+
+ @Override
+ synchronized public void sendNoResponse()
+ {
+ assert (_state > StateNotValidated);
+ try
+ {
+ if(--_dispatchCount == 0)
+ {
+ if(_state == StateFinished)
+ {
+ reap();
+ }
+ notifyAll();
+ }
+
+ if(_state >= StateClosed)
+ {
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
+ }
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+ }
+ catch(LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
+ }
+
+ @Override
+ public boolean systemException(int requestId, Ice.SystemException ex, boolean amd)
+ {
+ return false; // System exceptions aren't marshalled.
+ }
+
+ @Override
+ public synchronized void invokeException(int requestId, LocalException ex, int invokeNum, boolean amd)
+ {
+ //
+ // Fatal exception while invoking a request. Since
+ // sendResponse/sendNoResponse isn't
+ // called in case of a fatal exception we decrement _dispatchCount here.
+ //
+
+ setState(StateClosed, ex);
+
+ if(invokeNum > 0)
+ {
+ assert (_dispatchCount > 0);
+ _dispatchCount -= invokeNum;
+ assert (_dispatchCount >= 0);
+ if(_dispatchCount == 0)
+ {
+ if(_state == StateFinished)
+ {
+ reap();
+ }
+ notifyAll();
+ }
+ }
+ }
+
+ public IceInternal.EndpointI endpoint()
+ {
+ return _endpoint; // No mutex protection necessary, _endpoint is
+ // immutable.
+ }
+
+ public IceInternal.Connector connector()
+ {
+ return _connector; // No mutex protection necessary, _connector is
+ // immutable.
+ }
+
+ @Override
+ public synchronized void setAdapter(ObjectAdapter adapter)
+ {
+ if(_state <= StateNotValidated || _state >= StateClosing)
+ {
+ return;
+ }
+
+ _adapter = adapter;
+
+ if(_adapter != null)
+ {
+ _servantManager = ((ObjectAdapterI) _adapter).getServantManager();
+ if(_servantManager == null)
+ {
+ _adapter = null;
+ }
+ }
+ else
+ {
+ _servantManager = null;
+ }
+
+ //
+ // We never change the thread pool with which we were
+ // initially registered, even if we add or remove an object
+ // adapter.
+ //
+ }
+
+ @Override
+ public synchronized ObjectAdapter getAdapter()
+ {
+ return _adapter;
+ }
+
+ @Override
+ public Endpoint getEndpoint()
+ {
+ return _endpoint; // No mutex protection necessary, _endpoint is
+ // immutable.
+ }
+
+ @Override
+ public ObjectPrx createProxy(Identity ident)
+ {
+ //
+ // Create a reference and return a reverse proxy for this
+ // reference.
+ //
+ return _instance.proxyFactory().referenceToProxy(_instance.referenceFactory().create(ident, this));
+ }
+
+ //
+ // Operations from EventHandler
+ //
+ @Override
+ public void message(IceInternal.ThreadPoolCurrent current)
+ {
+ StartCallback startCB = null;
+ java.util.List<OutgoingMessage> sentCBs = null;
+ MessageInfo info = null;
+ int dispatchCount = 0;
+
+ synchronized(this)
+ {
+ if(_state >= StateClosed)
+ {
+ return;
+ }
+
+ if(!current.ioReady())
+ {
+ return;
+ }
+
+ int readyOp = current.operation;
+ try
+ {
+ unscheduleTimeout(current.operation);
+
+ int writeOp = IceInternal.SocketOperation.None;
+ int readOp = IceInternal.SocketOperation.None;
+
+ if((readyOp & IceInternal.SocketOperation.Write) != 0)
+ {
+ final IceInternal.Buffer buf = _writeStream.getBuffer();
+ if(_observer != null)
+ {
+ observerStartWrite(buf);
+ }
+ writeOp = write(buf);
+ if(_observer != null && (writeOp & IceInternal.SocketOperation.Write) == 0)
+ {
+ observerFinishWrite(buf);
+ }
+ }
+
+ while((readyOp & IceInternal.SocketOperation.Read) != 0)
+ {
+ final IceInternal.Buffer buf = _readStream.getBuffer();
+ if(_observer != null && !_readHeader)
+ {
+ observerStartRead(buf);
+ }
+
+ readOp = read(buf);
+ if((readOp & IceInternal.SocketOperation.Read) != 0)
+ {
+ break;
+ }
+ if(_observer != null && !_readHeader)
+ {
+ assert (!buf.b.hasRemaining());
+ observerFinishRead(buf);
+ }
+
+ if(_readHeader) // Read header if necessary.
+ {
+ _readHeader = false;
+
+ if(_observer != null)
+ {
+ _observer.receivedBytes(IceInternal.Protocol.headerSize);
+ }
+
+ int pos = _readStream.pos();
+ if(pos < IceInternal.Protocol.headerSize)
+ {
+ //
+ // This situation is possible for small UDP packets.
+ //
+ throw new Ice.IllegalMessageSizeException();
+ }
+
+ _readStream.pos(0);
+ byte[] m = new byte[4];
+ m[0] = _readStream.readByte();
+ m[1] = _readStream.readByte();
+ m[2] = _readStream.readByte();
+ m[3] = _readStream.readByte();
+ if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
+ m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
+ {
+ Ice.BadMagicException ex = new Ice.BadMagicException();
+ ex.badMagic = m;
+ throw ex;
+ }
+
+ _readProtocol.__read(_readStream);
+ IceInternal.Protocol.checkSupportedProtocol(_readProtocol);
+
+ _readProtocolEncoding.__read(_readStream);
+ IceInternal.Protocol.checkSupportedProtocolEncoding(_readProtocolEncoding);
+
+ _readStream.readByte(); // messageType
+ _readStream.readByte(); // compress
+ int size = _readStream.readInt();
+ if(size < IceInternal.Protocol.headerSize)
+ {
+ throw new Ice.IllegalMessageSizeException();
+ }
+ if(size > _messageSizeMax)
+ {
+ IceInternal.Ex.throwMemoryLimitException(size, _messageSizeMax);
+ }
+ if(size > _readStream.size())
+ {
+ _readStream.resize(size);
+ }
+ _readStream.pos(pos);
+ }
+
+ if(_readStream.pos() != _readStream.size())
+ {
+ if(_endpoint.datagram())
+ {
+ // The message was truncated.
+ throw new Ice.DatagramLimitException();
+ }
+ continue;
+ }
+ break;
+ }
+
+ int newOp = readOp | writeOp;
+ readyOp = readyOp & ~newOp;
+ assert (readyOp != 0 || newOp != 0);
+
+ if(_state <= StateNotValidated)
+ {
+ if(newOp != 0)
+ {
+ //
+ // Wait for all the transceiver conditions to be
+ // satisfied before continuing.
+ //
+ scheduleTimeout(newOp);
+ _threadPool.update(this, current.operation, newOp);
+ return;
+ }
+
+ if(_state == StateNotInitialized && !initialize(current.operation))
+ {
+ return;
+ }
+
+ if(_state <= StateNotValidated && !validate(current.operation))
+ {
+ return;
+ }
+
+ _threadPool.unregister(this, current.operation);
+
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
+ if(_startCallback != null)
+ {
+ startCB = _startCallback;
+ _startCallback = null;
+ if(startCB != null)
+ {
+ ++dispatchCount;
+ }
+ }
+ }
+ else
+ {
+ assert (_state <= StateClosingPending);
+
+ //
+ // We parse messages first, if we receive a close
+ // connection message we won't send more messages.
+ //
+ if((readyOp & IceInternal.SocketOperation.Read) != 0)
+ {
+ // Optimization: use the thread's stream.
+ info = new MessageInfo(current.stream);
+ newOp |= parseMessage(info);
+ dispatchCount += info.messageDispatchCount;
+ }
+
+ if((readyOp & IceInternal.SocketOperation.Write) != 0)
+ {
+ sentCBs = new java.util.LinkedList<OutgoingMessage>();
+ newOp |= sendNextMessage(sentCBs);
+ if(!sentCBs.isEmpty())
+ {
+ ++dispatchCount;
+ }
+ else
+ {
+ sentCBs = null;
+ }
+ }
+
+ if(_state < StateClosed)
+ {
+ scheduleTimeout(newOp);
+ _threadPool.update(this, current.operation, newOp);
+ }
+ }
+
+ if(_acmLastActivity > 0)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
+
+ if(dispatchCount == 0)
+ {
+ return; // Nothing to dispatch we're done!
+ }
+
+ _dispatchCount += dispatchCount;
+ current.ioCompleted();
+ }
+ catch(DatagramLimitException ex) // Expected.
+ {
+ if(_warnUdp)
+ {
+ _logger.warning("maximum datagram size of " + _readStream.pos() + " exceeded");
+ }
+ _readStream.resize(IceInternal.Protocol.headerSize);
+ _readStream.pos(0);
+ _readHeader = true;
+ return;
+ }
+ catch(SocketException ex)
+ {
+ setState(StateClosed, ex);
+ return;
+ }
+ catch(LocalException ex)
+ {
+ if(_endpoint.datagram())
+ {
+ if(_warn)
+ {
+ String s = "datagram connection exception:\n" + ex + '\n' + _desc;
+ _logger.warning(s);
+ }
+ _readStream.resize(IceInternal.Protocol.headerSize);
+ _readStream.pos(0);
+ _readHeader = true;
+ }
+ else
+ {
+ setState(StateClosed, ex);
+ }
+ return;
+ }
+ }
+
+ if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher.
+ {
+ dispatch(startCB, sentCBs, info);
+ }
+ else
+ {
+ // No need for the stream if heartbeat callback
+ if(info != null && info.heartbeatCallback == null)
+ {
+ //
+ // Create a new stream for the dispatch instead of using the
+ // thread pool's thread stream.
+ //
+ assert (info.stream == current.stream);
+ InputStream stream = info.stream;
+ info.stream = new InputStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
+ info.stream.swap(stream);
+ }
+
+ final StartCallback finalStartCB = startCB;
+ final java.util.List<OutgoingMessage> finalSentCBs = sentCBs;
+ final MessageInfo finalInfo = info;
+ _threadPool.dispatchFromThisThread(new IceInternal.DispatchWorkItem(this)
+ {
+ @Override
+ public void run()
+ {
+ dispatch(finalStartCB, finalSentCBs, finalInfo);
+ }
+ });
+ }
+ }
+
+ protected void dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info)
+ {
+ int dispatchedCount = 0;
+
+ //
+ // Notify the factory that the connection establishment and
+ // validation has completed.
+ //
+ if(startCB != null)
+ {
+ startCB.connectionStartCompleted(this);
+ ++dispatchedCount;
+ }
+
+ //
+ // Notify AMI calls that the message was sent.
+ //
+ if(sentCBs != null)
+ {
+ for(OutgoingMessage msg : sentCBs)
+ {
+ msg.outAsync.invokeSent();
+ }
+ ++dispatchedCount;
+ }
+
+ if(info != null)
+ {
+ //
+ // Asynchronous replies must be handled outside the thread
+ // synchronization, so that nested calls are possible.
+ //
+ if(info.outAsync != null)
+ {
+ info.outAsync.invokeCompleted();
+ ++dispatchedCount;
+ }
+
+ if(info.heartbeatCallback != null)
+ {
+ try
+ {
+ info.heartbeatCallback.heartbeat(this);
+ }
+ catch(Exception ex)
+ {
+ _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
+ }
+ ++dispatchedCount;
+ }
+
+ //
+ // Method invocation (or multiple invocations for batch messages)
+ // must be done outside the thread synchronization, so that nested
+ // calls are possible.
+ //
+ if(info.invokeNum > 0)
+ {
+ invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter);
+
+ //
+ // Don't increase dispatchedCount, the dispatch count is
+ // decreased when the incoming reply is sent.
+ //
+ }
+ }
+
+ //
+ // Decrease dispatch count.
+ //
+ if(dispatchedCount > 0)
+ {
+ boolean queueShutdown = false;
+
+ synchronized(this)
+ {
+ _dispatchCount -= dispatchedCount;
+ if(_dispatchCount == 0)
+ {
+ //
+ // Only initiate shutdown if not already done. It might
+ // have already been done if the sent callback or AMI
+ // callback was dispatched when the connection was already
+ // in the closing state.
+ //
+ if(_state == StateClosing)
+ {
+ if(_instance.queueRequests())
+ {
+ //
+ // We can't call initiateShutdown() from this thread in certain
+ // situations (such as in Android).
+ //
+ queueShutdown = true;
+ }
+ else
+ {
+ try
+ {
+ initiateShutdown();
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
+ }
+ }
+ else if(_state == StateFinished)
+ {
+ reap();
+ }
+ if(!queueShutdown)
+ {
+ notifyAll();
+ }
+ }
+ }
+
+ if(queueShutdown)
+ {
+ _instance.getQueueExecutor().executeNoThrow(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ synchronized(ConnectionI.this)
+ {
+ try
+ {
+ initiateShutdown();
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
+ ConnectionI.this.notifyAll();
+ }
+ return null;
+ }
+ });
+ }
+ }
+ }
+
+ @Override
+ public void finished(IceInternal.ThreadPoolCurrent current, final boolean close)
+ {
+ synchronized(this)
+ {
+ assert (_state == StateClosed);
+ unscheduleTimeout(IceInternal.SocketOperation.Read | IceInternal.SocketOperation.Write);
+ }
+
+ //
+ // If there are no callbacks to call, we don't call ioCompleted() since
+ // we're not going to call code that will potentially block (this avoids
+ // promoting a new leader and unecessary thread creation, especially if
+ // this is called on shutdown).
+ //
+ if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty() &&
+ _closeCallback == null && _heartbeatCallback == null)
+ {
+ finish(close);
+ return;
+ }
+
+ current.ioCompleted();
+ if(!_dispatcher) // Optimization, call finish() directly if there's no
+ // dispatcher.
+ {
+ finish(close);
+ }
+ else
+ {
+ _threadPool.dispatchFromThisThread(new IceInternal.DispatchWorkItem(this)
+ {
+ @Override
+ public void run()
+ {
+ finish(close);
+ }
+ });
+ }
+ }
+
+ public void finish(boolean close)
+ {
+ if(!_initialized)
+ {
+ if(_instance.traceLevels().network >= 2)
+ {
+ StringBuffer s = new StringBuffer("failed to ");
+ s.append(_connector != null ? "establish" : "accept");
+ s.append(" ");
+ s.append(_endpoint.protocol());
+ s.append(" connection\n");
+ s.append(toString());
+ s.append("\n");
+ s.append(_exception);
+ _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.toString());
+ }
+ }
+ else
+ {
+ if(_instance.traceLevels().network >= 1)
+ {
+ StringBuffer s = new StringBuffer("closed ");
+ s.append(_endpoint.protocol());
+ s.append(" connection\n");
+ s.append(toString());
+
+ //
+ // Trace the cause of unexpected connection closures
+ //
+ if(!(_exception instanceof CloseConnectionException ||
+ _exception instanceof ForcedCloseConnectionException ||
+ _exception instanceof ConnectionTimeoutException ||
+ _exception instanceof CommunicatorDestroyedException ||
+ _exception instanceof ObjectAdapterDeactivatedException))
+ {
+ s.append("\n");
+ s.append(_exception);
+ }
+ _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.toString());
+ }
+ }
+
+ if(close)
+ {
+ try
+ {
+ _transceiver.close();
+ }
+ catch(Ice.LocalException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "unexpected connection exception:\n " + _desc + "\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
+ }
+ }
+
+ if(_startCallback != null)
+ {
+ if(_instance.queueRequests())
+ {
+ // The connectStartFailed method might try to connect with another
+ // connector.
+ _instance.getQueueExecutor().executeNoThrow(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ _startCallback.connectionStartFailed(ConnectionI.this, _exception);
+ return null;
+ }
+ });
+ }
+ else
+ {
+ _startCallback.connectionStartFailed(this, _exception);
+ }
+ _startCallback = null;
+ }
+
+ if(!_sendStreams.isEmpty())
+ {
+ if(!_writeStream.isEmpty())
+ {
+ //
+ // Return the stream to the outgoing call. This is important for
+ // retriable AMI calls which are not marshalled again.
+ //
+ OutgoingMessage message = _sendStreams.getFirst();
+ _writeStream.swap(message.stream);
+ }
+
+ for(OutgoingMessage p : _sendStreams)
+ {
+ p.completed(_exception);
+ if(p.requestId > 0) // Make sure finished isn't called twice.
+ {
+ _asyncRequests.remove(p.requestId);
+ }
+ }
+ _sendStreams.clear();
+ }
+
+ for(IceInternal.OutgoingAsyncBase p : _asyncRequests.values())
+ {
+ if(p.completed(_exception))
+ {
+ p.invokeCompleted();
+ }
+ }
+ _asyncRequests.clear();
+
+ //
+ // Don't wait to be reaped to reclaim memory allocated by read/write streams.
+ //
+ _writeStream.clear();
+ _writeStream.getBuffer().clear();
+ _readStream.clear();
+ _readStream.getBuffer().clear();
+
+ if(_closeCallback != null)
+ {
+ try
+ {
+ _closeCallback.closed(this);
+ }
+ catch(Exception ex)
+ {
+ _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
+ }
+ _closeCallback = null;
+ }
+
+ _heartbeatCallback = null;
+
+ //
+ // This must be done last as this will cause waitUntilFinished() to
+ // return (and communicator objects such as the timer might be destroyed
+ // too).
+ //
+ synchronized(this)
+ {
+ setState(StateFinished);
+
+ if(_dispatchCount == 0)
+ {
+ reap();
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return _toString();
+ }
+
+ @Override
+ public java.nio.channels.SelectableChannel fd()
+ {
+ return _transceiver.fd();
+ }
+
+ @Override
+ public void setReadyCallback(IceInternal.ReadyCallback callback)
+ {
+ _transceiver.setReadyCallback(callback);
+ }
+
+ public synchronized void timedOut()
+ {
+ if(_state <= StateNotValidated)
+ {
+ setState(StateClosed, new ConnectTimeoutException());
+ }
+ else if(_state < StateClosing)
+ {
+ setState(StateClosed, new TimeoutException());
+ }
+ else if(_state < StateClosed)
+ {
+ setState(StateClosed, new CloseTimeoutException());
+ }
+ }
+
+ @Override
+ public String type()
+ {
+ return _type; // No mutex lock, _type is immutable.
+ }
+
+ @Override
+ public int timeout()
+ {
+ return _endpoint.timeout(); // No mutex protection necessary, _endpoint
+ // is immutable.
+ }
+
+ @Override
+ public synchronized ConnectionInfo getInfo()
+ {
+ if(_state >= StateClosed)
+ {
+ throw (Ice.LocalException) _exception.fillInStackTrace();
+ }
+ return initConnectionInfo();
+ }
+
+ @Override
+ public synchronized void setBufferSize(int rcvSize, int sndSize)
+ {
+ if(_state >= StateClosed)
+ {
+ throw (Ice.LocalException) _exception.fillInStackTrace();
+ }
+ _transceiver.setBufferSize(rcvSize, sndSize);
+ _info = null; // Invalidate the cached connection info
+ }
+
+ @Override
+ public String _toString()
+ {
+ return _desc; // No mutex lock, _desc is immutable.
+ }
+
+ public synchronized void exception(LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
+
+ public ConnectionI(Communicator communicator, IceInternal.Instance instance, IceInternal.ACMMonitor monitor,
+ IceInternal.Transceiver transceiver, IceInternal.Connector connector,
+ IceInternal.EndpointI endpoint, ObjectAdapterI adapter)
+ {
+ _communicator = communicator;
+ _instance = instance;
+ _monitor = monitor;
+ _transceiver = transceiver;
+ _desc = transceiver.toString();
+ _type = transceiver.protocol();
+ _connector = connector;
+ _endpoint = endpoint;
+ _adapter = adapter;
+ final Ice.InitializationData initData = instance.initializationData();
+ // Cached for better performance.
+ _dispatcher = initData.dispatcher != null;
+ _logger = initData.logger; // Cached for better performance.
+ _traceLevels = instance.traceLevels(); // Cached for better performance.
+ _timer = instance.timer();
+ _writeTimeout = new TimeoutCallback();
+ _writeTimeoutFuture = null;
+ _readTimeout = new TimeoutCallback();
+ _readTimeoutFuture = null;
+ _warn = initData.properties.getPropertyAsInt("Ice.Warn.Connections") > 0;
+ _warnUdp = instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;
+ _cacheBuffers = instance.cacheMessageBuffers();
+ if(_monitor != null && _monitor.getACM().timeout > 0)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
+ else
+ {
+ _acmLastActivity = -1;
+ }
+ _nextRequestId = 1;
+ _messageSizeMax = adapter != null ? adapter.messageSizeMax() : instance.messageSizeMax();
+ _batchRequestQueue = new IceInternal.BatchRequestQueue(instance, _endpoint.datagram());
+ _readStream = new InputStream(instance, IceInternal.Protocol.currentProtocolEncoding);
+ _readHeader = false;
+ _readStreamPos = -1;
+ _writeStream = new OutputStream(instance, IceInternal.Protocol.currentProtocolEncoding);
+ _writeStreamPos = -1;
+ _dispatchCount = 0;
+ _state = StateNotInitialized;
+
+ int compressionLevel = initData.properties.getPropertyAsIntWithDefault("Ice.Compression.Level", 1);
+ if(compressionLevel < 1)
+ {
+ compressionLevel = 1;
+ }
+ else if(compressionLevel > 9)
+ {
+ compressionLevel = 9;
+ }
+ _compressionLevel = compressionLevel;
+
+ if(adapter != null)
+ {
+ _servantManager = adapter.getServantManager();
+ }
+ else
+ {
+ _servantManager = null;
+ }
+
+ try
+ {
+ if(adapter != null)
+ {
+ _threadPool = adapter.getThreadPool();
+ }
+ else
+ {
+ _threadPool = _instance.clientThreadPool();
+ }
+ _threadPool.initialize(this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ throw ex;
+ }
+ catch(java.lang.Exception ex)
+ {
+ throw new Ice.SyscallException(ex);
+ }
+ }
+
+ @Override
+ protected synchronized void finalize() throws Throwable
+ {
+ try
+ {
+ IceUtilInternal.Assert.FinalizerAssert(_startCallback == null);
+ IceUtilInternal.Assert.FinalizerAssert(_state == StateFinished);
+ IceUtilInternal.Assert.FinalizerAssert(_dispatchCount == 0);
+ IceUtilInternal.Assert.FinalizerAssert(_sendStreams.isEmpty());
+ IceUtilInternal.Assert.FinalizerAssert(_asyncRequests.isEmpty());
+ }
+ catch(java.lang.Exception ex)
+ {
+ }
+ finally
+ {
+ super.finalize();
+ }
+ }
+
+ private static final int StateNotInitialized = 0;
+ private static final int StateNotValidated = 1;
+ private static final int StateActive = 2;
+ private static final int StateHolding = 3;
+ private static final int StateClosing = 4;
+ private static final int StateClosingPending = 5;
+ private static final int StateClosed = 6;
+ private static final int StateFinished = 7;
+
+ private void setState(int state, LocalException ex)
+ {
+ //
+ // If setState() is called with an exception, then only closed
+ // and closing states are permissible.
+ //
+ assert state >= StateClosing;
+
+ if(_state == state) // Don't switch twice.
+ {
+ return;
+ }
+
+ if(_exception == null)
+ {
+ //
+ // If we are in closed state, an exception must be set.
+ //
+ assert (_state != StateClosed);
+
+ _exception = ex;
+
+ //
+ // We don't warn if we are not validated.
+ //
+ if(_warn && _validated)
+ {
+ //
+ // Don't warn about certain expected exceptions.
+ //
+ if(!(_exception instanceof CloseConnectionException ||
+ _exception instanceof ForcedCloseConnectionException ||
+ _exception instanceof ConnectionTimeoutException ||
+ _exception instanceof CommunicatorDestroyedException ||
+ _exception instanceof ObjectAdapterDeactivatedException ||
+ (_exception instanceof ConnectionLostException && _state >= StateClosing)))
+ {
+ warning("connection exception", _exception);
+ }
+ }
+ }
+
+ //
+ // We must set the new state before we notify requests of any
+ // exceptions. Otherwise new requests may retry on a
+ // connection that is not yet marked as closed or closing.
+ //
+ setState(state);
+ }
+
+ private void setState(int state)
+ {
+ //
+ // We don't want to send close connection messages if the endpoint
+ // only supports oneway transmission from client to server.
+ //
+ if(_endpoint.datagram() && state == StateClosing)
+ {
+ state = StateClosed;
+ }
+
+ //
+ // Skip graceful shutdown if we are destroyed before validation.
+ //
+ if(_state <= StateNotValidated && state == StateClosing)
+ {
+ state = StateClosed;
+ }
+
+ if(_state == state) // Don't switch twice.
+ {
+ return;
+ }
+
+ try
+ {
+ switch(state)
+ {
+ case StateNotInitialized:
+ {
+ assert (false);
+ break;
+ }
+
+ case StateNotValidated:
+ {
+ if(_state != StateNotInitialized)
+ {
+ assert (_state == StateClosed);
+ return;
+ }
+ break;
+ }
+
+ case StateActive:
+ {
+ //
+ // Can only switch from holding or not validated to
+ // active.
+ //
+ if(_state != StateHolding && _state != StateNotValidated)
+ {
+ return;
+ }
+ _threadPool.register(this, IceInternal.SocketOperation.Read);
+ break;
+ }
+
+ case StateHolding:
+ {
+ //
+ // Can only switch from active or not validated to
+ // holding.
+ //
+ if(_state != StateActive && _state != StateNotValidated)
+ {
+ return;
+ }
+ if(_state == StateActive)
+ {
+ _threadPool.unregister(this, IceInternal.SocketOperation.Read);
+ }
+ break;
+ }
+
+ case StateClosing:
+ case StateClosingPending:
+ {
+ //
+ // Can't change back from closing pending.
+ //
+ if(_state >= StateClosingPending)
+ {
+ return;
+ }
+ break;
+ }
+
+ case StateClosed:
+ {
+ if(_state == StateFinished)
+ {
+ return;
+ }
+
+ _batchRequestQueue.destroy(_exception);
+
+ //
+ // Don't need to close now for connections so only close the transceiver
+ // if the selector request it.
+ //
+ if(_threadPool.finish(this, false))
+ {
+ _transceiver.close();
+ }
+ break;
+ }
+
+ case StateFinished:
+ {
+ assert (_state == StateClosed);
+ _communicator = null;
+ break;
+ }
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "unexpected connection exception:\n " + _desc + "\n" + sw.toString();
+ _instance.initializationData().logger.error(s);
+ }
+
+ //
+ // We only register with the connection monitor if our new state
+ // is StateActive. Otherwise we unregister with the connection
+ // monitor, but only if we were registered before, i.e., if our
+ // old state was StateActive.
+ //
+ if(_monitor != null)
+ {
+ if(state == StateActive)
+ {
+ if(_acmLastActivity > 0)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
+ _monitor.add(this);
+ }
+ else if(_state == StateActive)
+ {
+ _monitor.remove(this);
+ }
+ }
+
+ if(_instance.initializationData().observer != null)
+ {
+ Ice.Instrumentation.ConnectionState oldState = toConnectionState(_state);
+ Ice.Instrumentation.ConnectionState newState = toConnectionState(state);
+ if(oldState != newState)
+ {
+ _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ newState,
+ _observer);
+ if(_observer != null)
+ {
+ _observer.attach();
+ }
+ else
+ {
+ _writeStreamPos = -1;
+ _readStreamPos = -1;
+ }
+ }
+ if(_observer != null && state == StateClosed && _exception != null)
+ {
+ if(!(_exception instanceof CloseConnectionException ||
+ _exception instanceof ForcedCloseConnectionException ||
+ _exception instanceof ConnectionTimeoutException ||
+ _exception instanceof CommunicatorDestroyedException ||
+ _exception instanceof ObjectAdapterDeactivatedException ||
+ (_exception instanceof ConnectionLostException && _state >= StateClosing)))
+ {
+ _observer.failed(_exception.ice_id());
+ }
+ }
+ }
+ _state = state;
+
+ notifyAll();
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ try
+ {
+ initiateShutdown();
+ }
+ catch(LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
+ }
+ }
+
+ private void initiateShutdown()
+ {
+ assert (_state == StateClosing);
+ assert (_dispatchCount == 0);
+
+ if(_shutdownInitiated)
+ {
+ return;
+ }
+ _shutdownInitiated = true;
+
+ if(!_endpoint.datagram())
+ {
+ //
+ // Before we shut down, we send a close connection message.
+ //
+ OutputStream os = new OutputStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
+ os.writeBlob(IceInternal.Protocol.magic);
+ IceInternal.Protocol.currentProtocol.__write(os);
+ IceInternal.Protocol.currentProtocolEncoding.__write(os);
+ os.writeByte(IceInternal.Protocol.closeConnectionMsg);
+ os.writeByte((byte) 0); // compression status: always report 0 for
+ // CloseConnection in Java.
+ os.writeInt(IceInternal.Protocol.headerSize); // Message size.
+
+ if((sendMessage(new OutgoingMessage(os, false, false)) & IceInternal.AsyncStatus.Sent) > 0)
+ {
+ setState(StateClosingPending);
+
+ //
+ // Notify the the transceiver of the graceful connection
+ // closure.
+ //
+ int op = _transceiver.closing(true, _exception);
+ if(op != 0)
+ {
+ scheduleTimeout(op);
+ _threadPool.register(this, op);
+ }
+ }
+ }
+ }
+
+ private void heartbeat()
+ {
+ assert (_state == StateActive);
+
+ if(!_endpoint.datagram())
+ {
+ OutputStream os = new OutputStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
+ os.writeBlob(IceInternal.Protocol.magic);
+ IceInternal.Protocol.currentProtocol.__write(os);
+ IceInternal.Protocol.currentProtocolEncoding.__write(os);
+ os.writeByte(IceInternal.Protocol.validateConnectionMsg);
+ os.writeByte((byte) 0);
+ os.writeInt(IceInternal.Protocol.headerSize); // Message size.
+
+ try
+ {
+ OutgoingMessage message = new OutgoingMessage(os, false, false);
+ sendMessage(message);
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ assert (_exception != null);
+ }
+ }
+ }
+
+ private boolean initialize(int operation)
+ {
+ int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer());
+ if(s != IceInternal.SocketOperation.None)
+ {
+ scheduleTimeout(s);
+ _threadPool.update(this, operation, s);
+ return false;
+ }
+
+ //
+ // Update the connection description once the transceiver is
+ // initialized.
+ //
+ _desc = _transceiver.toString();
+ _initialized = true;
+ setState(StateNotValidated);
+
+ return true;
+ }
+
+ private boolean validate(int operation)
+ {
+ if(!_endpoint.datagram()) // Datagram connections are always implicitly
+ // validated.
+ {
+ if(_adapter != null) // The server side has the active role for
+ // connection validation.
+ {
+ if(_writeStream.isEmpty())
+ {
+ _writeStream.writeBlob(IceInternal.Protocol.magic);
+ IceInternal.Protocol.currentProtocol.__write(_writeStream);
+ IceInternal.Protocol.currentProtocolEncoding.__write(_writeStream);
+ _writeStream.writeByte(IceInternal.Protocol.validateConnectionMsg);
+ _writeStream.writeByte((byte) 0); // Compression status
+ // (always zero for
+ // validate connection).
+ _writeStream.writeInt(IceInternal.Protocol.headerSize); // Message
+ // size.
+ IceInternal.TraceUtil.traceSend(_writeStream, _logger, _traceLevels);
+ _writeStream.prepareWrite();
+ }
+
+ if(_observer != null)
+ {
+ observerStartWrite(_writeStream.getBuffer());
+ }
+
+ if(_writeStream.pos() != _writeStream.size())
+ {
+ int op = write(_writeStream.getBuffer());
+ if(op != 0)
+ {
+ scheduleTimeout(op);
+ _threadPool.update(this, operation, op);
+ return false;
+ }
+ }
+
+ if(_observer != null)
+ {
+ observerFinishWrite(_writeStream.getBuffer());
+ }
+ }
+ else
+ // The client side has the passive role for connection validation.
+ {
+ if(_readStream.isEmpty())
+ {
+ _readStream.resize(IceInternal.Protocol.headerSize);
+ _readStream.pos(0);
+ }
+
+ if(_observer != null)
+ {
+ observerStartRead(_readStream.getBuffer());
+ }
+
+ if(_readStream.pos() != _readStream.size())
+ {
+ int op = read(_readStream.getBuffer());
+ if(op != 0)
+ {
+ scheduleTimeout(op);
+ _threadPool.update(this, operation, op);
+ return false;
+ }
+ }
+
+ if(_observer != null)
+ {
+ observerFinishRead(_readStream.getBuffer());
+ }
+
+ assert (_readStream.pos() == IceInternal.Protocol.headerSize);
+ _readStream.pos(0);
+ byte[] m = _readStream.readBlob(4);
+ if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
+ m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3])
+ {
+ BadMagicException ex = new BadMagicException();
+ ex.badMagic = m;
+ throw ex;
+ }
+
+ _readProtocol.__read(_readStream);
+ IceInternal.Protocol.checkSupportedProtocol(_readProtocol);
+
+ _readProtocolEncoding.__read(_readStream);
+ IceInternal.Protocol.checkSupportedProtocolEncoding(_readProtocolEncoding);
+
+ byte messageType = _readStream.readByte();
+ if(messageType != IceInternal.Protocol.validateConnectionMsg)
+ {
+ throw new ConnectionNotValidatedException();
+ }
+ _readStream.readByte(); // Ignore compression status for
+ // validate connection.
+ int size = _readStream.readInt();
+ if(size != IceInternal.Protocol.headerSize)
+ {
+ throw new IllegalMessageSizeException();
+ }
+ IceInternal.TraceUtil.traceRecv(_readStream, _logger, _traceLevels);
+
+ _validated = true;
+ }
+ }
+
+ _writeStream.resize(0);
+ _writeStream.pos(0);
+
+ _readStream.resize(IceInternal.Protocol.headerSize);
+ _readStream.pos(0);
+ _readHeader = true;
+
+ if(_instance.traceLevels().network >= 1)
+ {
+ StringBuffer s = new StringBuffer();
+ if(_endpoint.datagram())
+ {
+ s.append("starting to ");
+ s.append(_connector != null ? "send" : "receive");
+ s.append(" ");
+ s.append(_endpoint.protocol());
+ s.append(" messages\n");
+ s.append(_transceiver.toDetailedString());
+ }
+ else
+ {
+ s.append(_connector != null ? "established" : "accepted");
+ s.append(" ");
+ s.append(_endpoint.protocol());
+ s.append(" connection\n");
+ s.append(toString());
+ }
+ _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.toString());
+ }
+
+ return true;
+ }
+
+ private int sendNextMessage(java.util.List<OutgoingMessage> callbacks)
+ {
+ if(_sendStreams.isEmpty())
+ {
+ return IceInternal.SocketOperation.None;
+ }
+ else if(_state == StateClosingPending && _writeStream.pos() == 0)
+ {
+ // Message wasn't sent, empty the _writeStream, we're not going to
+ // send more data.
+ OutgoingMessage message = _sendStreams.getFirst();
+ _writeStream.swap(message.stream);
+ return IceInternal.SocketOperation.None;
+ }
+
+ assert (!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
+ try
+ {
+ while(true)
+ {
+ //
+ // Notify the message that it was sent.
+ //
+ OutgoingMessage message = _sendStreams.getFirst();
+ _writeStream.swap(message.stream);
+ if(message.sent())
+ {
+ callbacks.add(message);
+ }
+ _sendStreams.removeFirst();
+
+ //
+ // If there's nothing left to send, we're done.
+ //
+ if(_sendStreams.isEmpty())
+ {
+ break;
+ }
+
+ //
+ // If we are in the closed state or if the close is
+ // pending, don't continue sending.
+ //
+ // This can occur if parseMessage (called before
+ // sendNextMessage by message()) closes the connection.
+ //
+ if(_state >= StateClosingPending)
+ {
+ return IceInternal.SocketOperation.None;
+ }
+
+ //
+ // Otherwise, prepare the next message stream for writing.
+ //
+ message = _sendStreams.getFirst();
+ assert (!message.prepared);
+ OutputStream stream = message.stream;
+
+ message.stream = doCompress(stream, message.compress);
+ message.stream.prepareWrite();
+ message.prepared = true;
+
+ IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels);
+
+ _writeStream.swap(message.stream);
+
+ //
+ // Send the message.
+ //
+ if(_observer != null)
+ {
+ observerStartWrite(_writeStream.getBuffer());
+ }
+ if(_writeStream.pos() != _writeStream.size())
+ {
+ int op = write(_writeStream.getBuffer());
+ if(op != 0)
+ {
+ return op;
+ }
+ }
+ if(_observer != null)
+ {
+ observerFinishWrite(_writeStream.getBuffer());
+ }
+ }
+
+ //
+ // If all the messages were sent and we are in the closing state, we
+ // schedule the close timeout to wait for the peer to close the
+ // connection.
+ //
+ if(_state == StateClosing && _shutdownInitiated)
+ {
+ setState(StateClosingPending);
+ int op = _transceiver.closing(true, _exception);
+ if(op != 0)
+ {
+ return op;
+ }
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
+ return IceInternal.SocketOperation.None;
+ }
+
+ private int sendMessage(OutgoingMessage message)
+ {
+ assert (_state < StateClosed);
+
+ if(!_sendStreams.isEmpty())
+ {
+ message.adopt();
+ _sendStreams.addLast(message);
+ return IceInternal.AsyncStatus.Queued;
+ }
+
+ //
+ // Attempt to send the message without blocking. If the send blocks, we
+ // register the connection with the selector thread.
+ //
+
+ assert (!message.prepared);
+
+ OutputStream stream = message.stream;
+
+ message.stream = doCompress(stream, message.compress);
+ message.stream.prepareWrite();
+ message.prepared = true;
+ int op;
+
+ IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels);
+
+ //
+ // Send the message without blocking.
+ //
+ if(_observer != null)
+ {
+ observerStartWrite(message.stream.getBuffer());
+ }
+ op = write(message.stream.getBuffer());
+ if(op == 0)
+ {
+ if(_observer != null)
+ {
+ observerFinishWrite(message.stream.getBuffer());
+ }
+
+ int status = IceInternal.AsyncStatus.Sent;
+ if(message.sent())
+ {
+ status |= IceInternal.AsyncStatus.InvokeSentCallback;
+ }
+
+ if(_acmLastActivity > 0)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
+ return status;
+ }
+
+ message.adopt();
+
+ _writeStream.swap(message.stream);
+ _sendStreams.addLast(message);
+ scheduleTimeout(op);
+ _threadPool.register(this, op);
+ return IceInternal.AsyncStatus.Queued;
+ }
+
+ private OutputStream doCompress(OutputStream uncompressed, boolean compress)
+ {
+ boolean compressionSupported = false;
+ if(compress)
+ {
+ //
+ // Don't check whether compression support is available unless the
+ // proxy is configured for compression.
+ //
+ compressionSupported = IceInternal.BZip2.supported();
+ }
+
+ if(compressionSupported && uncompressed.size() >= 100)
+ {
+ //
+ // Do compression.
+ //
+ IceInternal.Buffer cbuf = IceInternal.BZip2.compress(uncompressed.getBuffer(),
+ IceInternal.Protocol.headerSize, _compressionLevel);
+ if(cbuf != null)
+ {
+ OutputStream cstream =
+ new OutputStream(uncompressed.instance(), uncompressed.getEncoding(), cbuf, true);
+
+ //
+ // Set compression status.
+ //
+ cstream.pos(9);
+ cstream.writeByte((byte) 2);
+
+ //
+ // Write the size of the compressed stream into the header.
+ //
+ cstream.pos(10);
+ cstream.writeInt(cstream.size());
+
+ //
+ // Write the compression status and size of the compressed
+ // stream into the header of the uncompressed stream -- we need
+ // this to trace requests correctly.
+ //
+ uncompressed.pos(9);
+ uncompressed.writeByte((byte) 2);
+ uncompressed.writeInt(cstream.size());
+
+ return cstream;
+ }
+ }
+
+ uncompressed.pos(9);
+ uncompressed.writeByte((byte) (compressionSupported ? 1 : 0));
+
+ //
+ // Not compressed, fill in the message size.
+ //
+ uncompressed.pos(10);
+ uncompressed.writeInt(uncompressed.size());
+
+ return uncompressed;
+ }
+
+ private static class MessageInfo
+ {
+ MessageInfo(InputStream stream)
+ {
+ this.stream = stream;
+ }
+
+ InputStream stream;
+ int invokeNum;
+ int requestId;
+ byte compress;
+ IceInternal.ServantManager servantManager;
+ ObjectAdapter adapter;
+ IceInternal.OutgoingAsyncBase outAsync;
+ HeartbeatCallback heartbeatCallback;
+ int messageDispatchCount;
+ }
+
+ private int parseMessage(MessageInfo info)
+ {
+ assert (_state > StateNotValidated && _state < StateClosed);
+
+ _readStream.swap(info.stream);
+ _readStream.resize(IceInternal.Protocol.headerSize);
+ _readStream.pos(0);
+ _readHeader = true;
+
+ assert (info.stream.pos() == info.stream.size());
+
+ //
+ // Connection is validated on first message. This is only used by
+ // setState() to check wether or not we can print a connection
+ // warning (a client might close the connection forcefully if the
+ // connection isn't validated).
+ //
+ _validated = true;
+
+ try
+ {
+ //
+ // We don't need to check magic and version here. This has already
+ // been done by the ThreadPool which provides us with the stream.
+ //
+ info.stream.pos(8);
+ byte messageType = info.stream.readByte();
+ info.compress = info.stream.readByte();
+ if(info.compress == (byte)2)
+ {
+ if(IceInternal.BZip2.supported())
+ {
+ IceInternal.Buffer ubuf = IceInternal.BZip2.uncompress(info.stream.getBuffer(),
+ IceInternal.Protocol.headerSize,
+ _messageSizeMax);
+ info.stream = new InputStream(info.stream.instance(), info.stream.getEncoding(), ubuf, true);
+ }
+ else
+ {
+ FeatureNotSupportedException ex = new FeatureNotSupportedException();
+ ex.unsupportedFeature = "Cannot uncompress compressed message: "
+ + "org.apache.tools.bzip2.CBZip2OutputStream was not found";
+ throw ex;
+ }
+ }
+ info.stream.pos(IceInternal.Protocol.headerSize);
+
+ switch(messageType)
+ {
+ case IceInternal.Protocol.closeConnectionMsg:
+ {
+ IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
+ if(_endpoint.datagram())
+ {
+ if(_warn)
+ {
+ _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
+ }
+ }
+ else
+ {
+ setState(StateClosingPending, new CloseConnectionException());
+
+ //
+ // Notify the the transceiver of the graceful connection
+ // closure.
+ //
+ int op = _transceiver.closing(false, _exception);
+ if(op != 0)
+ {
+ return op;
+ }
+ setState(StateClosed);
+ }
+ break;
+ }
+
+ case IceInternal.Protocol.requestMsg:
+ {
+ if(_state >= StateClosing)
+ {
+ IceInternal.TraceUtil.trace("received request during closing\n"
+ + "(ignored by server, client will retry)", info.stream, _logger,
+ _traceLevels);
+ }
+ else
+ {
+ IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
+ info.requestId = info.stream.readInt();
+ info.invokeNum = 1;
+ info.servantManager = _servantManager;
+ info.adapter = _adapter;
+ ++info.messageDispatchCount;
+ }
+ break;
+ }
+
+ case IceInternal.Protocol.requestBatchMsg:
+ {
+ if(_state >= StateClosing)
+ {
+ IceInternal.TraceUtil.trace("received batch request during closing\n"
+ + "(ignored by server, client will retry)", info.stream, _logger,
+ _traceLevels);
+ }
+ else
+ {
+ IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
+ info.invokeNum = info.stream.readInt();
+ if(info.invokeNum < 0)
+ {
+ info.invokeNum = 0;
+ throw new UnmarshalOutOfBoundsException();
+ }
+ info.servantManager = _servantManager;
+ info.adapter = _adapter;
+ info.messageDispatchCount += info.invokeNum;
+ }
+ break;
+ }
+
+ case IceInternal.Protocol.replyMsg:
+ {
+ IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
+ info.requestId = info.stream.readInt();
+
+ IceInternal.OutgoingAsyncBase outAsync = _asyncRequests.remove(info.requestId);
+ if(outAsync != null && outAsync.completed(info.stream))
+ {
+ info.outAsync = outAsync;
+ ++info.messageDispatchCount;
+ }
+ notifyAll(); // Notify threads blocked in close(false)
+ break;
+ }
+
+ case IceInternal.Protocol.validateConnectionMsg:
+ {
+ IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
+ if(_heartbeatCallback != null)
+ {
+ info.heartbeatCallback = _heartbeatCallback;
+ ++info.messageDispatchCount;
+ }
+ break;
+ }
+
+ default:
+ {
+ IceInternal.TraceUtil.trace("received unknown message\n(invalid, closing connection)", info.stream,
+ _logger, _traceLevels);
+ throw new UnknownMessageException();
+ }
+ }
+ }
+ catch(LocalException ex)
+ {
+ if(_endpoint.datagram())
+ {
+ if(_warn)
+ {
+ _logger.warning("datagram connection exception:\n" + ex + '\n' + _desc);
+ }
+ }
+ else
+ {
+ setState(StateClosed, ex);
+ }
+ }
+
+ return _state == StateHolding ? IceInternal.SocketOperation.None : IceInternal.SocketOperation.Read;
+ }
+
+ private void invokeAll(InputStream stream, int invokeNum, int requestId, byte compress,
+ IceInternal.ServantManager servantManager, ObjectAdapter adapter)
+ {
+ //
+ // Note: In contrast to other private or protected methods, this
+ // operation must be called *without* the mutex locked.
+ //
+
+ IceInternal.Incoming in = null;
+ try
+ {
+ while(invokeNum > 0)
+ {
+
+ //
+ // Prepare the invocation.
+ //
+ boolean response = !_endpoint.datagram() && requestId != 0;
+ in = getIncoming(adapter, response, compress, requestId);
+
+ //
+ // Dispatch the invocation.
+ //
+ in.invoke(servantManager, stream);
+
+ --invokeNum;
+
+ reclaimIncoming(in);
+ in = null;
+ }
+
+ stream.clear();
+ }
+ catch(LocalException ex)
+ {
+ invokeException(requestId, ex, invokeNum, false);
+ }
+ catch(IceInternal.ServantError ex)
+ {
+ //
+ // ServantError is thrown when an Error has been raised by servant (or servant locator)
+ // code. We've already attempted to complete the invocation and send a response.
+ //
+ Throwable t = ex.getCause();
+ //
+ // Suppress AssertionError and OutOfMemoryError, rethrow everything else.
+ //
+ if(!(t instanceof java.lang.AssertionError || t instanceof java.lang.OutOfMemoryError))
+ {
+ throw (java.lang.Error)t;
+ }
+ }
+ catch(java.lang.Error ex)
+ {
+ //
+ // An Error was raised outside of servant code (i.e., by Ice code).
+ // Attempt to log the error and clean up. This may still fail
+ // depending on the severity of the error.
+ //
+ // Note that this does NOT send a response to the client.
+ //
+ UnknownException uex = new UnknownException(ex);
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ uex.unknown = sw.toString();
+ _logger.error(uex.unknown);
+ invokeException(requestId, uex, invokeNum, false);
+ //
+ // Suppress AssertionError and OutOfMemoryError, rethrow everything else.
+ //
+ if(!(ex instanceof java.lang.AssertionError || ex instanceof java.lang.OutOfMemoryError))
+ {
+ throw ex;
+ }
+ }
+ finally
+ {
+ if(in != null)
+ {
+ reclaimIncoming(in);
+ }
+ }
+ }
+
+ private void scheduleTimeout(int status)
+ {
+ int timeout;
+ if(_state < StateActive)
+ {
+ IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideConnectTimeout)
+ {
+ timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint.timeout();
+ }
+ }
+ else if(_state < StateClosingPending)
+ {
+ if(_readHeader) // No timeout for reading the header.
+ {
+ status &= ~IceInternal.SocketOperation.Read;
+ }
+ timeout = _endpoint.timeout();
+ }
+ else
+ {
+ IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideCloseTimeout)
+ {
+ timeout = defaultsAndOverrides.overrideCloseTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint.timeout();
+ }
+ }
+
+ if(timeout < 0)
+ {
+ return;
+ }
+
+ try
+ {
+ if((status & IceInternal.SocketOperation.Read) != 0)
+ {
+ if(_readTimeoutFuture != null)
+ {
+ _readTimeoutFuture.cancel(false);
+ }
+ _readTimeoutFuture = _timer.schedule(_readTimeout, timeout, java.util.concurrent.TimeUnit.MILLISECONDS);
+ }
+ if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0)
+ {
+ if(_writeTimeoutFuture != null)
+ {
+ _writeTimeoutFuture.cancel(false);
+ }
+ _writeTimeoutFuture = _timer.schedule(_writeTimeout, timeout,
+ java.util.concurrent.TimeUnit.MILLISECONDS);
+ }
+ }
+ catch(Throwable ex)
+ {
+ assert (false);
+ }
+ }
+
+ private void unscheduleTimeout(int status)
+ {
+ if((status & IceInternal.SocketOperation.Read) != 0 && _readTimeoutFuture != null)
+ {
+ _readTimeoutFuture.cancel(false);
+ _readTimeoutFuture = null;
+ }
+ if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0 &&
+ _writeTimeoutFuture != null)
+ {
+ _writeTimeoutFuture.cancel(false);
+ _writeTimeoutFuture = null;
+ }
+ }
+
+ private ConnectionInfo initConnectionInfo()
+ {
+ if(_state > StateNotInitialized && _info != null) // Update the connection information until it's initialized
+ {
+ return _info;
+ }
+
+ try
+ {
+ _info = _transceiver.getInfo();
+ }
+ catch(Ice.LocalException ex)
+ {
+ _info = new ConnectionInfo();
+ }
+ for(ConnectionInfo info = _info; info != null; info = info.underlying)
+ {
+ info.connectionId = _endpoint.connectionId();
+ info.adapterName = _adapter != null ? _adapter.getName() : "";
+ info.incoming = _connector == null;
+ }
+ return _info;
+ }
+
+ private Ice.Instrumentation.ConnectionState toConnectionState(int state)
+ {
+ return connectionStateMap[state];
+ }
+
+ private void warning(String msg, java.lang.Exception ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = msg + ":\n" + _desc + "\n" + sw.toString();
+ _logger.warning(s);
+ }
+
+ private void observerStartRead(IceInternal.Buffer buf)
+ {
+ if(_readStreamPos >= 0)
+ {
+ assert (!buf.empty());
+ _observer.receivedBytes(buf.b.position() - _readStreamPos);
+ }
+ _readStreamPos = buf.empty() ? -1 : buf.b.position();
+ }
+
+ private void observerFinishRead(IceInternal.Buffer buf)
+ {
+ if(_readStreamPos == -1)
+ {
+ return;
+ }
+ assert (buf.b.position() >= _readStreamPos);
+ _observer.receivedBytes(buf.b.position() - _readStreamPos);
+ _readStreamPos = -1;
+ }
+
+ private void observerStartWrite(IceInternal.Buffer buf)
+ {
+ if(_writeStreamPos >= 0)
+ {
+ assert (!buf.empty());
+ _observer.sentBytes(buf.b.position() - _writeStreamPos);
+ }
+ _writeStreamPos = buf.empty() ? -1 : buf.b.position();
+ }
+
+ private void observerFinishWrite(IceInternal.Buffer buf)
+ {
+ if(_writeStreamPos == -1)
+ {
+ return;
+ }
+ if(buf.b.position() > _writeStreamPos)
+ {
+ _observer.sentBytes(buf.b.position() - _writeStreamPos);
+ }
+ _writeStreamPos = -1;
+ }
+
+ private IceInternal.Incoming getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId)
+ {
+ IceInternal.Incoming in = null;
+
+ if(_cacheBuffers > 0)
+ {
+ synchronized(_incomingCacheMutex)
+ {
+ if(_incomingCache == null)
+ {
+ in = new IceInternal.Incoming(_instance, this, this, adapter, response, compress, requestId);
+ }
+ else
+ {
+ in = _incomingCache;
+ _incomingCache = _incomingCache.next;
+ in.reset(_instance, this, this, adapter, response, compress, requestId);
+ in.next = null;
+ }
+ }
+ }
+ else
+ {
+ in = new IceInternal.Incoming(_instance, this, this, adapter, response, compress, requestId);
+ }
+
+ return in;
+ }
+
+ private void reclaimIncoming(IceInternal.Incoming in)
+ {
+ if(_cacheBuffers > 0)
+ {
+ synchronized(_incomingCacheMutex)
+ {
+ in.next = _incomingCache;
+ _incomingCache = in;
+ //
+ // Clear references to Ice objects as soon as possible.
+ //
+ _incomingCache.reclaim();
+ }
+ }
+ }
+
+ private void reap()
+ {
+ if(_monitor != null)
+ {
+ _monitor.reap(this);
+ }
+ if(_observer != null)
+ {
+ _observer.detach();
+ }
+ }
+
+ private int read(IceInternal.Buffer buf)
+ {
+ int start = buf.b.position();
+ int op = _transceiver.read(buf);
+ if(_instance.traceLevels().network >= 3 && buf.b.position() != start)
+ {
+ StringBuffer s = new StringBuffer("received ");
+ if(_endpoint.datagram())
+ {
+ s.append(buf.b.limit());
+ }
+ else
+ {
+ s.append(buf.b.position() - start);
+ s.append(" of ");
+ s.append(buf.b.limit() - start);
+ }
+ s.append(" bytes via ");
+ s.append(_endpoint.protocol());
+ s.append("\n");
+ s.append(toString());
+
+ _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.toString());
+ }
+ return op;
+ }
+
+ private int write(IceInternal.Buffer buf)
+ {
+ int start = buf.b.position();
+ int op = _transceiver.write(buf);
+ if(_instance.traceLevels().network >= 3 && buf.b.position() != start)
+ {
+ StringBuffer s = new StringBuffer("sent ");
+ s.append(buf.b.position() - start);
+ if(!_endpoint.datagram())
+ {
+ s.append(" of ");
+ s.append(buf.b.limit() - start);
+ }
+ s.append(" bytes via ");
+ s.append(_endpoint.protocol());
+ s.append("\n");
+ s.append(toString());
+ _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.toString());
+ }
+ return op;
+ }
+
+ private static class OutgoingMessage
+ {
+ OutgoingMessage(OutputStream stream, boolean compress, boolean adopt)
+ {
+ this.stream = stream;
+ this.compress = compress;
+ this.adopt = adopt;
+ this.requestId = 0;
+ }
+
+ OutgoingMessage(IceInternal.OutgoingAsyncBase out, OutputStream stream, boolean compress,
+ int requestId)
+ {
+ this.stream = stream;
+ this.compress = compress;
+ this.outAsync = out;
+ this.requestId = requestId;
+ }
+
+ public void canceled()
+ {
+ assert (outAsync != null);
+ outAsync = null;
+ }
+
+ public void adopt()
+ {
+ if(adopt)
+ {
+ OutputStream stream =
+ new OutputStream(this.stream.instance(), IceInternal.Protocol.currentProtocolEncoding);
+ stream.swap(this.stream);
+ this.stream = stream;
+ adopt = false;
+ }
+ }
+
+ public boolean sent()
+ {
+ if(outAsync != null)
+ {
+ return outAsync.sent();
+ }
+ return false;
+ }
+
+ public void completed(Ice.LocalException ex)
+ {
+ if(outAsync != null && outAsync.completed(ex))
+ {
+ outAsync.invokeCompleted();
+ }
+ }
+
+ public OutputStream stream;
+ public IceInternal.OutgoingAsyncBase outAsync;
+ public boolean compress;
+ public int requestId;
+ boolean adopt;
+ boolean prepared;
+ }
+
+ private Communicator _communicator;
+ private final IceInternal.Instance _instance;
+ private IceInternal.ACMMonitor _monitor;
+ private final IceInternal.Transceiver _transceiver;
+ private String _desc;
+ private final String _type;
+ private final IceInternal.Connector _connector;
+ private final IceInternal.EndpointI _endpoint;
+
+ private ObjectAdapter _adapter;
+ private IceInternal.ServantManager _servantManager;
+
+ private final boolean _dispatcher;
+ private final Logger _logger;
+ private final IceInternal.TraceLevels _traceLevels;
+ private final IceInternal.ThreadPool _threadPool;
+
+ private final java.util.concurrent.ScheduledExecutorService _timer;
+ private final Runnable _writeTimeout;
+ private java.util.concurrent.Future<?> _writeTimeoutFuture;
+ private final Runnable _readTimeout;
+ private java.util.concurrent.Future<?> _readTimeoutFuture;
+
+ private StartCallback _startCallback = null;
+
+ private final boolean _warn;
+ private final boolean _warnUdp;
+
+ private long _acmLastActivity;
+
+ private final int _compressionLevel;
+
+ private int _nextRequestId;
+
+ private java.util.Map<Integer, IceInternal.OutgoingAsyncBase> _asyncRequests =
+ new java.util.HashMap<Integer, IceInternal.OutgoingAsyncBase>();
+
+ private LocalException _exception;
+
+ private final int _messageSizeMax;
+ private IceInternal.BatchRequestQueue _batchRequestQueue;
+
+ private java.util.LinkedList<OutgoingMessage> _sendStreams = new java.util.LinkedList<OutgoingMessage>();
+
+ private InputStream _readStream;
+ private boolean _readHeader;
+ private OutputStream _writeStream;
+
+ private Ice.Instrumentation.ConnectionObserver _observer;
+ private int _readStreamPos;
+ private int _writeStreamPos;
+
+ private int _dispatchCount;
+
+ private int _state; // The current state.
+ private boolean _shutdownInitiated = false;
+ private boolean _initialized = false;
+ private boolean _validated = false;
+
+ private IceInternal.Incoming _incomingCache;
+ private final java.lang.Object _incomingCacheMutex = new java.lang.Object();
+
+ private Ice.ProtocolVersion _readProtocol = new Ice.ProtocolVersion();
+ private Ice.EncodingVersion _readProtocolEncoding = new Ice.EncodingVersion();
+
+ private int _cacheBuffers;
+
+ private Ice.ConnectionInfo _info;
+
+ private CloseCallback _closeCallback;
+ private HeartbeatCallback _heartbeatCallback;
+
+ private static Ice.Instrumentation.ConnectionState connectionStateMap[] = {
+ Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized
+ Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated
+ Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive
+ Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosingPending
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished
+ };
+
+}