diff options
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 213 |
1 files changed, 199 insertions, 14 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 1feeb7b34e0..8e0c99e9761 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -9,6 +9,8 @@ package Ice; +import Ice.Instrumentation.InvocationObserver; + public final class ConnectionI extends IceInternal.EventHandler implements Connection { public interface StartCallback @@ -248,6 +250,25 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } synchronized public void + updateObserver() + { + if(_state < StateNotValidated || _state > StateClosed) + { + return; + } + + assert(_instance.initializationData().observer != null); + _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), + _endpoint, + toConnectionState(_state), + _observer); + if(_observer != null) + { + _observer.attach(); + } + } + + synchronized public void monitor(long now) { if(_state != StateActive) @@ -291,6 +312,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_state > StateNotValidated); assert(_state < StateClosing); + out.attachRemoteObserver(initConnectionInfo(), _endpoint); + // // Ensure the message isn't bigger than what we can send with the // transport. @@ -365,6 +388,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_state > StateNotValidated); assert(_state < StateClosing); + out.__attachRemoteObserver(initConnectionInfo(), _endpoint); + // // Ensure the message isn't bigger than what we can send with the // transport. @@ -615,7 +640,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void flushBatchRequests() { - IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance); + final InvocationObserver observer = IceInternal.ObserverHelper.get(_instance, __flushBatchRequests_name); + IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance, observer); out.invoke(); } @@ -681,6 +707,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throw (Ice.LocalException)_exception.fillInStackTrace(); } + out.attachRemoteObserver(initConnectionInfo(), _endpoint); + if(_batchRequestNum == 0) { out.sent(false); @@ -738,6 +766,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throw (Ice.LocalException)_exception.fillInStackTrace(); } + outAsync.__attachRemoteObserver(initConnectionInfo(), _endpoint); + if(_batchRequestNum == 0) { int status = IceInternal.AsyncStatus.Sent; @@ -791,7 +821,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state == StateFinished) { - _reaper.add(this); + _reaper.add(this, _observer); } notifyAll(); } @@ -825,7 +855,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state == StateFinished) { - _reaper.add(this); + _reaper.add(this, _observer); } notifyAll(); } @@ -934,12 +964,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne unscheduleTimeout(current.operation); if((current.operation & IceInternal.SocketOperation.Write) != 0 && !_writeStream.isEmpty()) { + if(_observer != null) + { + observerStartWrite(_writeStream.pos()); + } if(!_transceiver.write(_writeStream.getBuffer())) { assert(!_writeStream.isEmpty()); scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); return; } + if(_observer != null) + { + observerFinishWrite(_writeStream.pos()); + } assert(!_writeStream.getBuffer().b.hasRemaining()); } if((current.operation & IceInternal.SocketOperation.Read) != 0 && !_readStream.isEmpty()) @@ -953,6 +991,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(!_readStream.getBuffer().b.hasRemaining()); _readHeader = false; + if(_observer != null) + { + _observer.receivedBytes(IceInternal.Protocol.headerSize); + } + int pos = _readStream.pos(); if(pos < IceInternal.Protocol.headerSize) { @@ -1008,12 +1051,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else { + if(_observer != null) + { + observerStartRead(_readStream.pos()); + } if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData)) { assert(!_readStream.isEmpty()); scheduleTimeout(IceInternal.SocketOperation.Read, _endpoint.timeout()); return; } + if(_observer != null) + { + observerFinishRead(_readStream.pos()); + } assert(!_readStream.getBuffer().b.hasRemaining()); } } @@ -1235,7 +1286,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else if(_state == StateFinished) { - _reaper.add(this); + _reaper.add(this, _observer); } notifyAll(); } @@ -1361,7 +1412,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(StateFinished); if(_dispatchCount == 0) { - _reaper.add(this); + _reaper.add(this, _observer); } } } @@ -1420,10 +1471,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { throw (Ice.LocalException)_exception.fillInStackTrace(); } - ConnectionInfo info = _transceiver.getInfo(); - info.adapterName = _adapter != null ? _adapter.getName() : ""; - info.incoming = _connector == null; - return info; + return initConnectionInfo(); } public String @@ -1457,7 +1505,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state == StateFinished) { - _reaper.add(this); + _reaper.add(this, _observer); } notifyAll(); } @@ -1781,6 +1829,34 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } + if(_instance.initializationData().observer != null) + { + Ice.Instrumentation.ConnectionState oldState = toConnectionState(_state); + Ice.Instrumentation.ConnectionState newState = toConnectionState(state); + if(oldState != newState) + { + _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), + _endpoint, + newState, + _observer); + if(_observer != null) + { + _observer.attach(); + } + } + if(_observer != null && state == StateClosed && _exception != null) + { + if(!(_exception instanceof CloseConnectionException || + _exception instanceof ForcedCloseConnectionException || + _exception instanceof ConnectionTimeoutException || + _exception instanceof CommunicatorDestroyedException || + _exception instanceof ObjectAdapterDeactivatedException || + (_exception instanceof ConnectionLostException && _state == StateClosing))) + { + _observer.failed(_exception.ice_name()); + } + } + } _state = state; notifyAll(); @@ -1882,12 +1958,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _writeStream.prepareWrite(); } + if(_observer != null) + { + observerStartWrite(_writeStream.pos()); + } if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer())) { scheduleTimeout(IceInternal.SocketOperation.Write, connectTimeout()); _threadPool.update(this, operation, IceInternal.SocketOperation.Write); return false; } + if(_observer != null) + { + observerFinishWrite(_writeStream.pos()); + } } else // The client side has the passive role for connection validation. { @@ -1897,12 +1981,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _readStream.pos(0); } + if(_observer != null) + { + observerStartRead(_readStream.pos()); + } if(_readStream.pos() != _readStream.size() && !_transceiver.read(_readStream.getBuffer(), _hasMoreData)) { scheduleTimeout(IceInternal.SocketOperation.Read, connectTimeout()); _threadPool.update(this, operation, IceInternal.SocketOperation.Read); return false; } + if(_observer != null) + { + observerFinishRead(_readStream.pos()); + } assert(_readStream.pos() == IceInternal.Protocol.headerSize); _readStream.pos(0); @@ -2008,12 +2100,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // Send the message. // + if(_observer != null) + { + observerStartWrite(_writeStream.pos()); + } if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer())) { assert(!_writeStream.isEmpty()); scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); return callbacks; } + if(_observer != null) + { + observerFinishWrite(_writeStream.pos()); + } } } catch(Ice.LocalException ex) @@ -2071,8 +2171,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels); } + if(_observer != null) + { + observerStartWrite(message.stream.pos()); + } if(_transceiver.write(message.stream.getBuffer())) { + if(_observer != null) + { + observerFinishWrite(message.stream.pos()); + } int status = IceInternal.AsyncStatus.Sent; if(message.sent(this, false)) { @@ -2459,6 +2567,31 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } + private ConnectionInfo + initConnectionInfo() + { + if(_info != null) + { + return _info; + } + + ConnectionInfo info = _transceiver.getInfo(); + info.connectionId = _endpoint.connectionId(); + info.adapterName = _adapter != null ? _adapter.getName() : ""; + info.incoming = _connector == null; + if(_state > StateNotInitialized) + { + _info = info; // Cache the connection information only if initialized. + } + return info; + } + + private Ice.Instrumentation.ConnectionState + toConnectionState(int state) + { + return connectionStateMap[state]; + } + private void warning(String msg, Exception ex) { @@ -2470,6 +2603,42 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _logger.warning(s); } + private void + observerStartRead(int pos) + { + if(_readStreamPos > 0) + { + _observer.receivedBytes(pos - _readStreamPos); + } + _readStreamPos = pos; + } + + private void + observerFinishRead(int pos) + { + assert(pos >= _readStreamPos); + _observer.receivedBytes(pos - _readStreamPos); + _readStreamPos = 0; + } + + private void + observerStartWrite(int pos) + { + if(_writeStreamPos > 0) + { + _observer.sentBytes(pos - _writeStreamPos); + } + _writeStreamPos = pos; + } + + private void + observerFinishWrite(int pos) + { + assert(pos >= _writeStreamPos); + _observer.sentBytes(pos - _writeStreamPos); + _writeStreamPos = 0; + } + private IceInternal.Incoming getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId) { @@ -2519,7 +2688,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public IceInternal.Outgoing getOutgoing(IceInternal.RequestHandler handler, String operation, OperationMode mode, - java.util.Map<String, String> context) + java.util.Map<String, String> context, InvocationObserver observer) throws IceInternal.LocalExceptionWrapper { IceInternal.Outgoing out = null; @@ -2530,20 +2699,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_outgoingCache == null) { - out = new IceInternal.Outgoing(handler, operation, mode, context); + out = new IceInternal.Outgoing(handler, operation, mode, context, observer); } else { out = _outgoingCache; _outgoingCache = _outgoingCache.next; - out.reset(handler, operation, mode, context); + out.reset(handler, operation, mode, context, observer); out.next = null; } } } else { - out = new IceInternal.Outgoing(handler, operation, mode, context); + out = new IceInternal.Outgoing(handler, operation, mode, context, observer); } return out; @@ -2709,6 +2878,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private boolean _readHeader; private IceInternal.BasicStream _writeStream; + private Ice.Instrumentation.ConnectionObserver _observer; + private int _readStreamPos; + private int _writeStreamPos; + private int _dispatchCount; private int _state; // The current state. @@ -2724,4 +2897,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private Ice.EncodingVersion _readProtocolEncoding = new Ice.EncodingVersion(); private int _cacheBuffers; + + private Ice.ConnectionInfo _info; + + private static Ice.Instrumentation.ConnectionState connectionStateMap[] = { + Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized + Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated + Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive + Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding + Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing + Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed + Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished + }; } |