diff options
Diffstat (limited to 'java/src')
49 files changed, 3732 insertions, 354 deletions
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java index 8450ccf1ffa..e70aac96a0b 100644 --- a/java/src/Ice/AsyncResult.java +++ b/java/src/Ice/AsyncResult.java @@ -295,6 +295,12 @@ public class AsyncResult __error(exc); } } + + if(_observer != null) + { + _observer.detach(); + _observer = null; + } } protected final void __sentInternal() @@ -325,6 +331,19 @@ public class AsyncResult } } + public void + __attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt) + { + if(_observer != null) + { + _remoteObserver = _observer.getRemoteObserver(info, endpt); + if(_remoteObserver != null) + { + _remoteObserver.attach(); + } + } + } + public final void __sentAsync() { // @@ -420,6 +439,12 @@ public class AsyncResult __error(exc); } } + + if(_observer != null) + { + _observer.detach(); + _observer = null; + } } protected final void __warning(RuntimeException ex) @@ -454,5 +479,8 @@ public class AsyncResult protected boolean _sentSynchronously; protected LocalException _exception; + protected Ice.Instrumentation.InvocationObserver _observer; + protected Ice.Instrumentation.Observer _remoteObserver; + private IceInternal.CallbackBase _callback; } diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 1feeb7b34e0..8e0c99e9761 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -9,6 +9,8 @@ package Ice; +import Ice.Instrumentation.InvocationObserver; + public final class ConnectionI extends IceInternal.EventHandler implements Connection { public interface StartCallback @@ -248,6 +250,25 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } synchronized public void + updateObserver() + { + if(_state < StateNotValidated || _state > StateClosed) + { + return; + } + + assert(_instance.initializationData().observer != null); + _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), + _endpoint, + toConnectionState(_state), + _observer); + if(_observer != null) + { + _observer.attach(); + } + } + + synchronized public void monitor(long now) { if(_state != StateActive) @@ -291,6 +312,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_state > StateNotValidated); assert(_state < StateClosing); + out.attachRemoteObserver(initConnectionInfo(), _endpoint); + // // Ensure the message isn't bigger than what we can send with the // transport. @@ -365,6 +388,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_state > StateNotValidated); assert(_state < StateClosing); + out.__attachRemoteObserver(initConnectionInfo(), _endpoint); + // // Ensure the message isn't bigger than what we can send with the // transport. @@ -615,7 +640,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void flushBatchRequests() { - IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance); + final InvocationObserver observer = IceInternal.ObserverHelper.get(_instance, __flushBatchRequests_name); + IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance, observer); out.invoke(); } @@ -681,6 +707,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throw (Ice.LocalException)_exception.fillInStackTrace(); } + out.attachRemoteObserver(initConnectionInfo(), _endpoint); + if(_batchRequestNum == 0) { out.sent(false); @@ -738,6 +766,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne throw (Ice.LocalException)_exception.fillInStackTrace(); } + outAsync.__attachRemoteObserver(initConnectionInfo(), _endpoint); + if(_batchRequestNum == 0) { int status = IceInternal.AsyncStatus.Sent; @@ -791,7 +821,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state == StateFinished) { - _reaper.add(this); + _reaper.add(this, _observer); } notifyAll(); } @@ -825,7 +855,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state == StateFinished) { - _reaper.add(this); + _reaper.add(this, _observer); } notifyAll(); } @@ -934,12 +964,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne unscheduleTimeout(current.operation); if((current.operation & IceInternal.SocketOperation.Write) != 0 && !_writeStream.isEmpty()) { + if(_observer != null) + { + observerStartWrite(_writeStream.pos()); + } if(!_transceiver.write(_writeStream.getBuffer())) { assert(!_writeStream.isEmpty()); scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); return; } + if(_observer != null) + { + observerFinishWrite(_writeStream.pos()); + } assert(!_writeStream.getBuffer().b.hasRemaining()); } if((current.operation & IceInternal.SocketOperation.Read) != 0 && !_readStream.isEmpty()) @@ -953,6 +991,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(!_readStream.getBuffer().b.hasRemaining()); _readHeader = false; + if(_observer != null) + { + _observer.receivedBytes(IceInternal.Protocol.headerSize); + } + int pos = _readStream.pos(); if(pos < IceInternal.Protocol.headerSize) { @@ -1008,12 +1051,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else { + if(_observer != null) + { + observerStartRead(_readStream.pos()); + } if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData)) { assert(!_readStream.isEmpty()); scheduleTimeout(IceInternal.SocketOperation.Read, _endpoint.timeout()); return; } + if(_observer != null) + { + observerFinishRead(_readStream.pos()); + } assert(!_readStream.getBuffer().b.hasRemaining()); } } @@ -1235,7 +1286,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else if(_state == StateFinished) { - _reaper.add(this); + _reaper.add(this, _observer); } notifyAll(); } @@ -1361,7 +1412,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(StateFinished); if(_dispatchCount == 0) { - _reaper.add(this); + _reaper.add(this, _observer); } } } @@ -1420,10 +1471,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { throw (Ice.LocalException)_exception.fillInStackTrace(); } - ConnectionInfo info = _transceiver.getInfo(); - info.adapterName = _adapter != null ? _adapter.getName() : ""; - info.incoming = _connector == null; - return info; + return initConnectionInfo(); } public String @@ -1457,7 +1505,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state == StateFinished) { - _reaper.add(this); + _reaper.add(this, _observer); } notifyAll(); } @@ -1781,6 +1829,34 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } + if(_instance.initializationData().observer != null) + { + Ice.Instrumentation.ConnectionState oldState = toConnectionState(_state); + Ice.Instrumentation.ConnectionState newState = toConnectionState(state); + if(oldState != newState) + { + _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), + _endpoint, + newState, + _observer); + if(_observer != null) + { + _observer.attach(); + } + } + if(_observer != null && state == StateClosed && _exception != null) + { + if(!(_exception instanceof CloseConnectionException || + _exception instanceof ForcedCloseConnectionException || + _exception instanceof ConnectionTimeoutException || + _exception instanceof CommunicatorDestroyedException || + _exception instanceof ObjectAdapterDeactivatedException || + (_exception instanceof ConnectionLostException && _state == StateClosing))) + { + _observer.failed(_exception.ice_name()); + } + } + } _state = state; notifyAll(); @@ -1882,12 +1958,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _writeStream.prepareWrite(); } + if(_observer != null) + { + observerStartWrite(_writeStream.pos()); + } if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer())) { scheduleTimeout(IceInternal.SocketOperation.Write, connectTimeout()); _threadPool.update(this, operation, IceInternal.SocketOperation.Write); return false; } + if(_observer != null) + { + observerFinishWrite(_writeStream.pos()); + } } else // The client side has the passive role for connection validation. { @@ -1897,12 +1981,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _readStream.pos(0); } + if(_observer != null) + { + observerStartRead(_readStream.pos()); + } if(_readStream.pos() != _readStream.size() && !_transceiver.read(_readStream.getBuffer(), _hasMoreData)) { scheduleTimeout(IceInternal.SocketOperation.Read, connectTimeout()); _threadPool.update(this, operation, IceInternal.SocketOperation.Read); return false; } + if(_observer != null) + { + observerFinishRead(_readStream.pos()); + } assert(_readStream.pos() == IceInternal.Protocol.headerSize); _readStream.pos(0); @@ -2008,12 +2100,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // Send the message. // + if(_observer != null) + { + observerStartWrite(_writeStream.pos()); + } if(_writeStream.pos() != _writeStream.size() && !_transceiver.write(_writeStream.getBuffer())) { assert(!_writeStream.isEmpty()); scheduleTimeout(IceInternal.SocketOperation.Write, _endpoint.timeout()); return callbacks; } + if(_observer != null) + { + observerFinishWrite(_writeStream.pos()); + } } } catch(Ice.LocalException ex) @@ -2071,8 +2171,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels); } + if(_observer != null) + { + observerStartWrite(message.stream.pos()); + } if(_transceiver.write(message.stream.getBuffer())) { + if(_observer != null) + { + observerFinishWrite(message.stream.pos()); + } int status = IceInternal.AsyncStatus.Sent; if(message.sent(this, false)) { @@ -2459,6 +2567,31 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } + private ConnectionInfo + initConnectionInfo() + { + if(_info != null) + { + return _info; + } + + ConnectionInfo info = _transceiver.getInfo(); + info.connectionId = _endpoint.connectionId(); + info.adapterName = _adapter != null ? _adapter.getName() : ""; + info.incoming = _connector == null; + if(_state > StateNotInitialized) + { + _info = info; // Cache the connection information only if initialized. + } + return info; + } + + private Ice.Instrumentation.ConnectionState + toConnectionState(int state) + { + return connectionStateMap[state]; + } + private void warning(String msg, Exception ex) { @@ -2470,6 +2603,42 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _logger.warning(s); } + private void + observerStartRead(int pos) + { + if(_readStreamPos > 0) + { + _observer.receivedBytes(pos - _readStreamPos); + } + _readStreamPos = pos; + } + + private void + observerFinishRead(int pos) + { + assert(pos >= _readStreamPos); + _observer.receivedBytes(pos - _readStreamPos); + _readStreamPos = 0; + } + + private void + observerStartWrite(int pos) + { + if(_writeStreamPos > 0) + { + _observer.sentBytes(pos - _writeStreamPos); + } + _writeStreamPos = pos; + } + + private void + observerFinishWrite(int pos) + { + assert(pos >= _writeStreamPos); + _observer.sentBytes(pos - _writeStreamPos); + _writeStreamPos = 0; + } + private IceInternal.Incoming getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId) { @@ -2519,7 +2688,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public IceInternal.Outgoing getOutgoing(IceInternal.RequestHandler handler, String operation, OperationMode mode, - java.util.Map<String, String> context) + java.util.Map<String, String> context, InvocationObserver observer) throws IceInternal.LocalExceptionWrapper { IceInternal.Outgoing out = null; @@ -2530,20 +2699,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_outgoingCache == null) { - out = new IceInternal.Outgoing(handler, operation, mode, context); + out = new IceInternal.Outgoing(handler, operation, mode, context, observer); } else { out = _outgoingCache; _outgoingCache = _outgoingCache.next; - out.reset(handler, operation, mode, context); + out.reset(handler, operation, mode, context, observer); out.next = null; } } } else { - out = new IceInternal.Outgoing(handler, operation, mode, context); + out = new IceInternal.Outgoing(handler, operation, mode, context, observer); } return out; @@ -2709,6 +2878,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private boolean _readHeader; private IceInternal.BasicStream _writeStream; + private Ice.Instrumentation.ConnectionObserver _observer; + private int _readStreamPos; + private int _writeStreamPos; + private int _dispatchCount; private int _state; // The current state. @@ -2724,4 +2897,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private Ice.EncodingVersion _readProtocolEncoding = new Ice.EncodingVersion(); private int _cacheBuffers; + + private Ice.ConnectionInfo _info; + + private static Ice.Instrumentation.ConnectionState connectionStateMap[] = { + Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized + Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated + Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive + Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding + Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing + Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed + Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished + }; } diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index fb19f7fb234..f08532fc6f4 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -745,6 +745,34 @@ public final class ObjectAdapterI implements ObjectAdapter } } + public void + updateConnectionObservers() + { + java.util.List<IceInternal.IncomingConnectionFactory> f; + synchronized(this) + { + f = new java.util.ArrayList<IceInternal.IncomingConnectionFactory>(_incomingConnectionFactories); + } + for(IceInternal.IncomingConnectionFactory p : f) + { + p.updateConnectionObservers(); + } + } + + public void + updateThreadObservers() + { + IceInternal.ThreadPool threadPool = null; + synchronized(this) + { + threadPool = _threadPool; + } + if(threadPool != null) + { + threadPool.updateObservers(); + } + } + public synchronized void incDirectCount() { @@ -785,7 +813,7 @@ public final class ObjectAdapterI implements ObjectAdapter } else { - return _instance.serverThreadPool(); + return _instance.serverThreadPool(true); } } diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index fd6deac1100..032e41c50e3 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -9,6 +9,8 @@ package Ice; +import Ice.Instrumentation.InvocationObserver; + /** * Base class for all proxies. **/ @@ -91,23 +93,34 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable __context = _emptyContext; } + final InvocationObserver __observer = IceInternal.ObserverHelper.get(this, __ice_isA_name, __context); int __cnt = 0; - while(true) + try { - _ObjectDel __del = null; - try + while(true) { - __checkTwowayOnly(__ice_isA_name); - __del = __getDelegate(false); - return __del.ice_isA(__id, __context); - } - catch(IceInternal.LocalExceptionWrapper __ex) - { - __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt); + _ObjectDel __del = null; + try + { + __checkTwowayOnly(__ice_isA_name); + __del = __getDelegate(false); + return __del.ice_isA(__id, __context, __observer); + } + catch(IceInternal.LocalExceptionWrapper __ex) + { + __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt, __observer); + } + catch(LocalException __ex) + { + __cnt = __handleException(__del, __ex, null, __cnt, __observer); + } } - catch(LocalException __ex) + } + finally + { + if(__observer != null) { - __cnt = __handleException(__del, __ex, null, __cnt); + __observer.detach(); } } } @@ -270,23 +283,34 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable __context = _emptyContext; } + final InvocationObserver __observer = IceInternal.ObserverHelper.get(this, __ice_ping_name, __context); int __cnt = 0; - while(true) + try { - _ObjectDel __del = null; - try + while(true) { - __del = __getDelegate(false); - __del.ice_ping(__context); - return; - } - catch(IceInternal.LocalExceptionWrapper __ex) - { - __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt); + _ObjectDel __del = null; + try + { + __del = __getDelegate(false); + __del.ice_ping(__context, __observer); + return; + } + catch(IceInternal.LocalExceptionWrapper __ex) + { + __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt, __observer); + } + catch(LocalException __ex) + { + __cnt = __handleException(__del, __ex, null, __cnt, __observer); + } } - catch(LocalException __ex) + } + finally + { + if(__observer != null) { - __cnt = __handleException(__del, __ex, null, __cnt); + __observer.detach(); } } } @@ -427,23 +451,34 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable __context = _emptyContext; } + final InvocationObserver __observer = IceInternal.ObserverHelper.get(this, __ice_ids_name, __context); int __cnt = 0; - while(true) + try { - _ObjectDel __del = null; - try + while(true) { - __checkTwowayOnly(__ice_ids_name); - __del = __getDelegate(false); - return __del.ice_ids(__context); - } - catch(IceInternal.LocalExceptionWrapper __ex) - { - __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt); + _ObjectDel __del = null; + try + { + __checkTwowayOnly(__ice_ids_name); + __del = __getDelegate(false); + return __del.ice_ids(__context, __observer); + } + catch(IceInternal.LocalExceptionWrapper __ex) + { + __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt, __observer); + } + catch(LocalException __ex) + { + __cnt = __handleException(__del, __ex, null, __cnt, __observer); + } } - catch(LocalException __ex) + } + finally + { + if(__observer != null) { - __cnt = __handleException(__del, __ex, null, __cnt); + __observer.detach(); } } } @@ -601,23 +636,34 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable __context = _emptyContext; } + final InvocationObserver __observer = IceInternal.ObserverHelper.get(this, __ice_id_name, __context); int __cnt = 0; - while(true) + try { - _ObjectDel __del = null; - try + while(true) { - __checkTwowayOnly(__ice_id_name); - __del = __getDelegate(false); - return __del.ice_id(__context); - } - catch(IceInternal.LocalExceptionWrapper __ex) - { - __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt); + _ObjectDel __del = null; + try + { + __checkTwowayOnly(__ice_id_name); + __del = __getDelegate(false); + return __del.ice_id(__context, __observer); + } + catch(IceInternal.LocalExceptionWrapper __ex) + { + __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt, __observer); + } + catch(LocalException __ex) + { + __cnt = __handleException(__del, __ex, null, __cnt, __observer); + } } - catch(LocalException __ex) + } + finally + { + if(__observer != null) { - __cnt = __handleException(__del, __ex, null, __cnt); + __observer.detach(); } } } @@ -796,29 +842,40 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable context = _emptyContext; } + final InvocationObserver __observer = IceInternal.ObserverHelper.get(this, __ice_invoke_name, context); int __cnt = 0; - while(true) + try { - _ObjectDel __del = null; - try + while(true) { - __del = __getDelegate(false); - return __del.ice_invoke(operation, mode, inParams, outParams, context); - } - catch(IceInternal.LocalExceptionWrapper __ex) - { - if(mode == OperationMode.Nonmutating || mode == OperationMode.Idempotent) + _ObjectDel __del = null; + try { - __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt); + __del = __getDelegate(false); + return __del.ice_invoke(operation, mode, inParams, outParams, context, __observer); } - else + catch(IceInternal.LocalExceptionWrapper __ex) { - __handleExceptionWrapper(__del, __ex); + if(mode == OperationMode.Nonmutating || mode == OperationMode.Idempotent) + { + __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt, __observer); + } + else + { + __handleExceptionWrapper(__del, __ex, __observer); + } + } + catch(LocalException __ex) + { + __cnt = __handleException(__del, __ex, null, __cnt, __observer); } } - catch(LocalException __ex) + } + finally + { + if(__observer != null) { - __cnt = __handleException(__del, __ex, null, __cnt); + __observer.detach(); } } } @@ -1711,19 +1768,31 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final Connection ice_getConnection() { + final InvocationObserver __observer = IceInternal.ObserverHelper.get(this, "ice_getConnection"); int __cnt = 0; - while(true) + try { - _ObjectDel __del = null; - try + while(true) { - __del = __getDelegate(false); - return __del.__getRequestHandler().getConnection(true); // Wait for the connection to be established. - + _ObjectDel __del = null; + try + { + __del = __getDelegate(false); + // Wait for the connection to be established. + return __del.__getRequestHandler().getConnection(true); + + } + catch(LocalException __ex) + { + __cnt = __handleException(__del, __ex, null, __cnt, __observer); + } } - catch(LocalException __ex) + } + finally + { + if(__observer != null) { - __cnt = __handleException(__del, __ex, null, __cnt); + __observer.detach(); } } } @@ -1772,17 +1841,18 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch // requests were queued with the connection, they would be lost without being noticed. // + final InvocationObserver __observer = IceInternal.ObserverHelper.get(this, __ice_flushBatchRequests_name); _ObjectDel __del = null; int __cnt = -1; // Don't retry. try { __del = __getDelegate(false); - __del.ice_flushBatchRequests(); + __del.ice_flushBatchRequests(__observer); return; } catch(LocalException __ex) { - __cnt = __handleException(__del, __ex, null, __cnt); + __cnt = __handleException(__del, __ex, null, __cnt, __observer); } } @@ -1953,7 +2023,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } public final int - __handleException(_ObjectDel delegate, LocalException ex, Ice.IntHolder interval, int cnt) + __handleException(_ObjectDel delegate, LocalException ex, Ice.IntHolder interval, int cnt, InvocationObserver obsv) { // // Only _delegate needs to be mutex protected here. @@ -1966,28 +2036,44 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } } - if(cnt == -1) // Don't retry if the retry count is -1. - { - throw ex; - } - try { - return _reference.getInstance().proxyFactory().checkRetryAfterException(ex, _reference, interval, cnt); + if(cnt == -1) // Don't retry if the retry count is -1. + { + throw ex; + } + + try + { + cnt = _reference.getInstance().proxyFactory().checkRetryAfterException(ex, _reference, interval, + cnt); + } + catch(CommunicatorDestroyedException e) + { + // + // The communicator is already destroyed, so we cannot + // retry. + // + throw e; + } + if(obsv != null) + { + obsv.retried(); + } + return cnt; } - catch(CommunicatorDestroyedException e) + catch(Ice.LocalException e) { - // - // The communicator is already destroyed, so we cannot - // retry. - // - throw ex; + if(obsv != null) + { + obsv.failed(e.ice_name()); + } + throw e; } - } public final void - __handleExceptionWrapper(_ObjectDel delegate, IceInternal.LocalExceptionWrapper ex) + __handleExceptionWrapper(_ObjectDel delegate, IceInternal.LocalExceptionWrapper ex, InvocationObserver obsv) { synchronized(this) { @@ -1999,19 +2085,28 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable if(!ex.retry()) { + if(obsv != null) + { + obsv.failed(ex.get().ice_name()); + } throw ex.get(); } + else if(obsv != null) + { + obsv.retried(); + } } public final int __handleExceptionWrapperRelaxed(_ObjectDel delegate, IceInternal.LocalExceptionWrapper ex, Ice.IntHolder interval, - int cnt) + int cnt, + InvocationObserver obsv) { if(!ex.retry()) { - return __handleException(delegate, ex.get(), interval, cnt); + return __handleException(delegate, ex.get(), interval, cnt, obsv); } else { @@ -2022,7 +2117,10 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable _delegate = null; } } - + if(obsv != null) + { + obsv.retried(); + } return cnt; } } @@ -2202,10 +2300,10 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable private void writeObject(java.io.ObjectOutputStream out) - throws java.io.IOException - { - out.writeUTF(toString()); - } + throws java.io.IOException + { + out.writeUTF(toString()); + } private void readObject(java.io.ObjectInputStream in) diff --git a/java/src/Ice/_ObjectDel.java b/java/src/Ice/_ObjectDel.java index e18d35d2ab6..8da4dcf959f 100644 --- a/java/src/Ice/_ObjectDel.java +++ b/java/src/Ice/_ObjectDel.java @@ -9,25 +9,27 @@ package Ice; +import Ice.Instrumentation.InvocationObserver; + public interface _ObjectDel { - boolean ice_isA(String id, java.util.Map<String, String> context) + boolean ice_isA(String id, java.util.Map<String, String> context, InvocationObserver obsv) throws IceInternal.LocalExceptionWrapper; - void ice_ping(java.util.Map<String, String> context) + void ice_ping(java.util.Map<String, String> context, InvocationObserver obsv) throws IceInternal.LocalExceptionWrapper; - String[] ice_ids(java.util.Map<String, String> context) + String[] ice_ids(java.util.Map<String, String> context, InvocationObserver obsv) throws IceInternal.LocalExceptionWrapper; - String ice_id(java.util.Map<String, String> context) + String ice_id(java.util.Map<String, String> context, InvocationObserver obsv) throws IceInternal.LocalExceptionWrapper; boolean ice_invoke(String operation, Ice.OperationMode mode, byte[] inParams, ByteSeqHolder outParams, - java.util.Map<String, String> context) + java.util.Map<String, String> context, InvocationObserver obsv) throws IceInternal.LocalExceptionWrapper; - void ice_flushBatchRequests(); + void ice_flushBatchRequests(InvocationObserver obsv); IceInternal.RequestHandler __getRequestHandler(); void __setRequestHandler(IceInternal.RequestHandler handler); diff --git a/java/src/Ice/_ObjectDelD.java b/java/src/Ice/_ObjectDelD.java index 6c702ca67ba..e283006e733 100644 --- a/java/src/Ice/_ObjectDelD.java +++ b/java/src/Ice/_ObjectDelD.java @@ -9,10 +9,12 @@ package Ice; +import Ice.Instrumentation.InvocationObserver; + public class _ObjectDelD implements _ObjectDel { public boolean - ice_isA(final String __id, java.util.Map<String, String> __context) + ice_isA(final String __id, java.util.Map<String, String> __context, InvocationObserver __observer) throws IceInternal.LocalExceptionWrapper { final Current __current = new Current(); @@ -62,7 +64,7 @@ public class _ObjectDelD implements _ObjectDel } public void - ice_ping(java.util.Map<String, String> __context) + ice_ping(java.util.Map<String, String> __context, InvocationObserver __observer) throws IceInternal.LocalExceptionWrapper { final Current __current = new Current(); @@ -109,7 +111,7 @@ public class _ObjectDelD implements _ObjectDel } public String[] - ice_ids(java.util.Map<String, String> __context) + ice_ids(java.util.Map<String, String> __context, InvocationObserver __observer) throws IceInternal.LocalExceptionWrapper { final Current __current = new Current(); @@ -160,7 +162,7 @@ public class _ObjectDelD implements _ObjectDel } public String - ice_id(java.util.Map<String, String> __context) + ice_id(java.util.Map<String, String> __context, InvocationObserver __observer) throws IceInternal.LocalExceptionWrapper { final Current __current = new Current(); @@ -211,13 +213,13 @@ public class _ObjectDelD implements _ObjectDel public boolean ice_invoke(String operation, Ice.OperationMode mode, byte[] inParams, ByteSeqHolder outParams, - java.util.Map<String, String> context) + java.util.Map<String, String> context, InvocationObserver observer) { throw new CollocationOptimizationException(); } public void - ice_flushBatchRequests() + ice_flushBatchRequests(InvocationObserver observer) { throw new CollocationOptimizationException(); } diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java index 398eed35a11..9c1cfdfe468 100644 --- a/java/src/Ice/_ObjectDelM.java +++ b/java/src/Ice/_ObjectDelM.java @@ -9,13 +9,15 @@ package Ice; +import Ice.Instrumentation.InvocationObserver; + public class _ObjectDelM implements _ObjectDel { public boolean - ice_isA(String __id, java.util.Map<String, String> __context) + ice_isA(String __id, java.util.Map<String, String> __context, InvocationObserver __observer) throws IceInternal.LocalExceptionWrapper { - IceInternal.Outgoing __og = __handler.getOutgoing("ice_isA", OperationMode.Nonmutating, __context); + IceInternal.Outgoing __og = __handler.getOutgoing("ice_isA", OperationMode.Nonmutating, __context, __observer); try { try @@ -59,10 +61,10 @@ public class _ObjectDelM implements _ObjectDel } public void - ice_ping(java.util.Map<String, String> __context) + ice_ping(java.util.Map<String, String> __context, InvocationObserver __observer) throws IceInternal.LocalExceptionWrapper { - IceInternal.Outgoing __og = __handler.getOutgoing("ice_ping", OperationMode.Nonmutating, __context); + IceInternal.Outgoing __og = __handler.getOutgoing("ice_ping", OperationMode.Nonmutating, __context, __observer); try { __og.writeEmptyParams(); @@ -97,10 +99,10 @@ public class _ObjectDelM implements _ObjectDel } public String[] - ice_ids(java.util.Map<String, String> __context) + ice_ids(java.util.Map<String, String> __context, InvocationObserver __observer) throws IceInternal.LocalExceptionWrapper { - IceInternal.Outgoing __og = __handler.getOutgoing("ice_ids", OperationMode.Nonmutating, __context); + IceInternal.Outgoing __og = __handler.getOutgoing("ice_ids", OperationMode.Nonmutating, __context, __observer); try { __og.writeEmptyParams(); @@ -135,10 +137,10 @@ public class _ObjectDelM implements _ObjectDel } public String - ice_id(java.util.Map<String, String> __context) + ice_id(java.util.Map<String, String> __context, InvocationObserver __observer) throws IceInternal.LocalExceptionWrapper { - IceInternal.Outgoing __og = __handler.getOutgoing("ice_id", OperationMode.Nonmutating, __context); + IceInternal.Outgoing __og = __handler.getOutgoing("ice_id", OperationMode.Nonmutating, __context, __observer); try { __og.writeEmptyParams(); @@ -173,10 +175,11 @@ public class _ObjectDelM implements _ObjectDel } public boolean - ice_invoke(String operation, OperationMode mode, byte[] inParams, ByteSeqHolder outParams, java.util.Map<String, String> __context) + ice_invoke(String operation, OperationMode mode, byte[] inParams, ByteSeqHolder outParams, + java.util.Map<String, String> __context, InvocationObserver __observer) throws IceInternal.LocalExceptionWrapper { - IceInternal.Outgoing __og = __handler.getOutgoing(operation, mode, __context); + IceInternal.Outgoing __og = __handler.getOutgoing(operation, mode, __context, __observer); try { __og.writeParamEncaps(inParams); @@ -204,9 +207,9 @@ public class _ObjectDelM implements _ObjectDel } public void - ice_flushBatchRequests() + ice_flushBatchRequests(InvocationObserver observer) { - IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(__handler); + IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(__handler, observer); out.invoke(); } diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java index f1dad286f3d..8dfeefcf49d 100644 --- a/java/src/IceInternal/BatchOutgoing.java +++ b/java/src/IceInternal/BatchOutgoing.java @@ -9,22 +9,27 @@ package IceInternal; +import Ice.Instrumentation.Observer; +import Ice.Instrumentation.InvocationObserver; + public final class BatchOutgoing implements OutgoingMessageCallback { public - BatchOutgoing(Ice.ConnectionI connection, Instance instance) + BatchOutgoing(Ice.ConnectionI connection, Instance instance, InvocationObserver observer) { _connection = connection; _sent = false; _os = new BasicStream(instance, Protocol.currentProtocolEncoding); + _observer = observer; } public - BatchOutgoing(RequestHandler handler) + BatchOutgoing(RequestHandler handler, InvocationObserver observer) { _handler = handler; _sent = false; _os = new BasicStream(handler.getReference().getInstance(), Protocol.currentProtocolEncoding); + _observer = observer; } public void @@ -48,6 +53,11 @@ public final class BatchOutgoing implements OutgoingMessageCallback } } + if(_remoteObserver != null) + { + _remoteObserver.detach(); + _remoteObserver = null; + } if(_exception != null) { throw _exception; @@ -86,9 +96,26 @@ public final class BatchOutgoing implements OutgoingMessageCallback return _os; } + public void + attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt) + { + if(_observer != null) + { + _remoteObserver = _observer.getRemoteObserver(info, endpt); + if(_remoteObserver != null) + { + _remoteObserver.attach(); + } + } + } + private RequestHandler _handler; private Ice.ConnectionI _connection; private BasicStream _os; private boolean _sent; private Ice.LocalException _exception; + + private InvocationObserver _observer; + private Observer _remoteObserver; + } diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java index 7c5d3f6fc66..6278faf9412 100644 --- a/java/src/IceInternal/BatchOutgoingAsync.java +++ b/java/src/IceInternal/BatchOutgoingAsync.java @@ -21,6 +21,11 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync synchronized(_monitor) { _state |= Done | OK | Sent; + if(_remoteObserver != null) + { + _remoteObserver.detach(); + _remoteObserver = null; + } _monitor.notifyAll(); return true; } @@ -33,6 +38,11 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync public void __finished(Ice.LocalException exc, boolean sent) { + if(_remoteObserver != null) + { + _remoteObserver.detach(); + _remoteObserver = null; + } __exception(exc); } } diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java index 8d2d2be737c..b64c56b8d99 100644 --- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java +++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java @@ -9,7 +9,7 @@ package IceInternal; -public class CommunicatorBatchOutgoingAsync extends BatchOutgoingAsync +public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult { public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback) @@ -27,100 +27,113 @@ public class CommunicatorBatchOutgoingAsync extends BatchOutgoingAsync // Assume all connections are flushed synchronously. // _sentSynchronously = true; + + // + // Attach observer + // + _observer = ObserverHelper.get(instance, operation); } - public void flushConnection(Ice.Connection con) + public void flushConnection(Ice.ConnectionI con) { - synchronized(_monitor) + class BatchOutgoingAsyncI extends BatchOutgoingAsync { - ++_useCount; - } - con.begin_flushBatchRequests(_cb); - } + public + BatchOutgoingAsyncI() + { + super(CommunicatorBatchOutgoingAsync.this._communicator, + CommunicatorBatchOutgoingAsync.this._instance, + CommunicatorBatchOutgoingAsync.this._operation, + null); + } - public void ready() - { - check(null, null, true); - } + public boolean + __sent(Ice.ConnectionI con) + { + if(_remoteObserver != null) + { + _remoteObserver.detach(); + _remoteObserver = null; + } + check(false); + return false; + } - private void completed(Ice.AsyncResult r) - { - Ice.Connection con = r.getConnection(); - assert(con != null); + public void + __finished(Ice.LocalException ex, boolean sent) + { + if(_remoteObserver != null) + { + _remoteObserver.detach(); + _remoteObserver = null; + } + check(false); + } - try + public void + __attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt) + { + if(CommunicatorBatchOutgoingAsync.this._observer != null) + { + _remoteObserver = CommunicatorBatchOutgoingAsync.this._observer.getRemoteObserver(info, endpt); + if(_remoteObserver != null) + { + _remoteObserver.attach(); + } + } + } + }; + + synchronized(_monitor) { - con.end_flushBatchRequests(r); - assert(false); // completed() should only be called when an exception occurs. + ++_useCount; } - catch(Ice.LocalException ex) + + int status = con.flushAsyncBatchRequests(new BatchOutgoingAsyncI()); + if((status & AsyncStatus.Sent) > 0) { - check(r, ex, false); + _sentSynchronously = false; } } - private void sent(Ice.AsyncResult r) + public void ready() { - check(r, null, r.sentSynchronously()); + check(true); } - private void check(Ice.AsyncResult r, Ice.LocalException ex, boolean userThread) + private void check(boolean userThread) { - boolean done = false; - synchronized(_monitor) { assert(_useCount > 0); - --_useCount; - - // - // We report that the communicator flush request was sent synchronously - // if all of the connection flush requests are sent synchronously. - // - if((r != null && !r.sentSynchronously()) || ex != null) + if(--_useCount > 0) { - _sentSynchronously = false; + return; } - - if(_useCount == 0) + + if(_observer != null) { - done = true; - _state |= Done | OK | Sent; - _monitor.notifyAll(); + _observer.detach(); + _observer = null; } - } - if(done) - { - // - // sentSynchronously_ is immutable here. - // - if(!_sentSynchronously && userThread) - { - __sentAsync(); - } - else - { - assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible. - __sent(); - } + _state |= Done | OK | Sent; + _monitor.notifyAll(); } - } - - private int _useCount; - private Ice.Callback _cb = new Ice.Callback() - { - @Override - public void completed(Ice.AsyncResult r) + // + // sentSynchronously_ is immutable here. + // + if(!_sentSynchronously && userThread) { - CommunicatorBatchOutgoingAsync.this.completed(r); + __sentAsync(); } - - @Override - public void sent(Ice.AsyncResult r) + else { - CommunicatorBatchOutgoingAsync.this.sent(r); + assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible. + __sentInternal(); } - }; + } + + private int _useCount; } diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 4991933cc6a..14574cd061a 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -9,6 +9,8 @@ package IceInternal; +import Ice.Instrumentation.InvocationObserver; + public class ConnectRequestHandler implements RequestHandler, Reference.GetConnectionCallback, RouterInfo.AddProxyCallback { @@ -182,18 +184,19 @@ public class ConnectRequestHandler } public Outgoing - getOutgoing(String operation, Ice.OperationMode mode, java.util.Map<String, String> context) + getOutgoing(String operation, Ice.OperationMode mode, java.util.Map<String, String> context, + InvocationObserver observer) throws LocalExceptionWrapper { synchronized(this) { if(!initialized()) { - return new IceInternal.Outgoing(this, operation, mode, context); + return new IceInternal.Outgoing(this, operation, mode, context, observer); } } - return _connection.getOutgoing(this, operation, mode, context); + return _connection.getOutgoing(this, operation, mode, context, observer); } public void diff --git a/java/src/IceInternal/ConnectionReaper.java b/java/src/IceInternal/ConnectionReaper.java index 00e1b9a4455..b97ac399267 100644 --- a/java/src/IceInternal/ConnectionReaper.java +++ b/java/src/IceInternal/ConnectionReaper.java @@ -12,9 +12,13 @@ package IceInternal; public class ConnectionReaper { synchronized public void - add(Ice.ConnectionI connection) + add(Ice.ConnectionI connection, Ice.Instrumentation.Observer observer) { _connections.add(connection); + if(observer != null) + { + observer.detach(); + } } synchronized public java.util.List<Ice.ConnectionI> diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java index 68df847139f..6ec4fab8475 100644 --- a/java/src/IceInternal/ConnectionRequestHandler.java +++ b/java/src/IceInternal/ConnectionRequestHandler.java @@ -9,6 +9,8 @@ package IceInternal; +import Ice.Instrumentation.InvocationObserver; + public class ConnectionRequestHandler implements RequestHandler { public void @@ -64,10 +66,11 @@ public class ConnectionRequestHandler implements RequestHandler } public Outgoing - getOutgoing(String operation, Ice.OperationMode mode, java.util.Map<String, String> context) + getOutgoing(String operation, Ice.OperationMode mode, java.util.Map<String, String> context, + InvocationObserver observer) throws LocalExceptionWrapper { - return _connection.getOutgoing(this, operation, mode, context); + return _connection.getOutgoing(this, operation, mode, context, observer); } public void diff --git a/java/src/IceInternal/EndpointHostResolver.java b/java/src/IceInternal/EndpointHostResolver.java index 9c10ce27570..2c5f4e99ecf 100644 --- a/java/src/IceInternal/EndpointHostResolver.java +++ b/java/src/IceInternal/EndpointHostResolver.java @@ -17,6 +17,7 @@ public class EndpointHostResolver try { _thread = new HelperThread(); + updateObserver(); if(_instance.initializationData().properties.getProperty("Ice.ThreadPriority").length() > 0) { _thread.setPriority(Util.getThreadPriorityProperty(_instance.initializationData().properties, "Ice")); @@ -31,6 +32,43 @@ public class EndpointHostResolver } } + public java.util.List<Connector> + resolve(String host, int port, EndpointI endpoint) + { + Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.Observer observer = null; + if(obsv != null) + { + observer = obsv.getEndpointLookupObserver(endpoint); + if(observer != null) + { + observer.attach(); + } + } + + java.util.List<Connector> connectors = null; + try + { + connectors = endpoint.connectors(Network.getAddresses(host, port, _instance.protocolSupport())); + } + catch(Ice.LocalException ex) + { + if(observer != null) + { + observer.failed(ex.ice_name()); + } + throw ex; + } + finally + { + if(observer != null) + { + observer.detach(); + } + } + return connectors; + } + synchronized public void resolve(String host, int port, EndpointI endpoint, EndpointI_connectors callback) { @@ -47,6 +85,17 @@ public class EndpointHostResolver entry.port = port; entry.endpoint = endpoint; entry.callback = callback; + + Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + if(obsv != null) + { + entry.observer = obsv.getEndpointLookupObserver(endpoint); + if(entry.observer != null) + { + entry.observer.attach(); + } + } + _queue.add(entry); notify(); } @@ -71,6 +120,10 @@ public class EndpointHostResolver catch(InterruptedException ex) { } + if(_observer != null) + { + _observer.detach(); + } } } @@ -79,7 +132,8 @@ public class EndpointHostResolver { while(true) { - ResolveEntry resolve; + ResolveEntry r; + Ice.Instrumentation.ThreadObserver threadObserver; synchronized(this) { while(!_destroyed && _queue.isEmpty()) @@ -98,40 +152,88 @@ public class EndpointHostResolver break; } - resolve = (ResolveEntry)_queue.removeFirst(); + r = (ResolveEntry)_queue.removeFirst(); + threadObserver = _observer; } + int protocol = _instance.protocolSupport(); try { - resolve.callback.connectors( - resolve.endpoint.connectors( - Network.getAddresses(resolve.host, resolve.port, _instance.protocolSupport()))); + if(threadObserver != null) + { + threadObserver.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateIdle, + Ice.Instrumentation.ThreadState.ThreadStateInUseForOther); + } + + r.callback.connectors(r.endpoint.connectors(Network.getAddresses(r.host, r.port, protocol))); + + if(threadObserver != null) + { + threadObserver.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateInUseForOther, + Ice.Instrumentation.ThreadState.ThreadStateIdle); + } } catch(Ice.LocalException ex) { - resolve.callback.exception(ex); + if(r.observer != null) + { + r.observer.failed(ex.ice_name()); + } + r.callback.exception(ex); + } + finally + { + if(r.observer != null) + { + r.observer.detach(); + } } } - for(ResolveEntry p : _queue) + for(ResolveEntry entry : _queue) { - p.callback.exception(new Ice.CommunicatorDestroyedException()); + Ice.CommunicatorDestroyedException ex = new Ice.CommunicatorDestroyedException(); + entry.callback.exception(ex); + if(entry.observer != null) + { + entry.observer.failed(ex.ice_name()); + entry.observer.detach(); + } } _queue.clear(); } + synchronized public void + updateObserver() + { + Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + if(obsv != null) + { + _observer = obsv.getThreadObserver("Communicator", + _thread.getName(), + Ice.Instrumentation.ThreadState.ThreadStateIdle, + _observer); + if(_observer != null) + { + _observer.attach(); + } + } + } + static class ResolveEntry { String host; int port; EndpointI endpoint; EndpointI_connectors callback; + Ice.Instrumentation.Observer observer; } private final Instance _instance; private boolean _destroyed; private java.util.LinkedList<ResolveEntry> _queue = new java.util.LinkedList<ResolveEntry>(); - + private Ice.Instrumentation.ThreadObserver _observer; + private final class HelperThread extends Thread { HelperThread() @@ -141,7 +243,7 @@ public class EndpointHostResolver { threadName += "-"; } - setName(threadName + "Ice.EndpointHostResolverThread"); + setName(threadName + "Ice.HostResolver"); } public void diff --git a/java/src/IceInternal/EndpointI.java b/java/src/IceInternal/EndpointI.java index 0000be4a8fa..129b4e25190 100644 --- a/java/src/IceInternal/EndpointI.java +++ b/java/src/IceInternal/EndpointI.java @@ -11,10 +11,11 @@ package IceInternal; abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable<EndpointI> { - public EndpointI(Ice.ProtocolVersion protocol, Ice.EncodingVersion encoding) + public EndpointI(Ice.ProtocolVersion protocol, Ice.EncodingVersion encoding, String connectionId) { _protocol = protocol; _encoding = encoding; + _connectionId = connectionId; } public EndpointI() @@ -97,6 +98,14 @@ abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable<En } // + // Return the connection ID + // + public String connectionId() + { + return _connectionId; + } + + // // Return a server side transceiver for this endpoint, or null if a // transceiver can only be created by an acceptor. In case a // transceiver is created, this operation also returns a new @@ -182,6 +191,11 @@ abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable<En return 1; } + if(!_connectionId.equals(p._connectionId)) + { + return _connectionId.compareTo(p._connectionId); + } + return 0; } @@ -243,4 +257,5 @@ abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable<En protected Ice.ProtocolVersion _protocol; protected Ice.EncodingVersion _encoding; + protected String _connectionId = ""; } diff --git a/java/src/IceInternal/Incoming.java b/java/src/IceInternal/Incoming.java index 819a6dda886..a990559bd32 100644 --- a/java/src/IceInternal/Incoming.java +++ b/java/src/IceInternal/Incoming.java @@ -9,6 +9,8 @@ package IceInternal; +import Ice.Instrumentation.CommunicatorObserver; + final public class Incoming extends IncomingBase implements Ice.Request { public @@ -119,6 +121,16 @@ final public class Incoming extends IncomingBase implements Ice.Request _current.ctx.put(first, second); } + CommunicatorObserver obsv = _instance.initializationData().observer; + if(obsv != null) + { + _observer = obsv.getDispatchObserver(_current); + if(_observer != null) + { + _observer.attach(); + } + } + // // Don't put the code above into the try block below. Exceptions // in the code above are considered fatal, and must propagate to @@ -145,7 +157,12 @@ final public class Incoming extends IncomingBase implements Ice.Request catch(Ice.UserException ex) { Ice.EncodingVersion encoding = _is.skipEncaps(); // Required for batch requests. - + + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + if(_response) { _os.writeByte(ReplyStatus.replyUserException); @@ -159,6 +176,11 @@ final public class Incoming extends IncomingBase implements Ice.Request _connection.sendNoResponse(); } + if(_observer != null) + { + _observer.detach(); + _observer = null; + } _connection = null; return; } @@ -238,6 +260,11 @@ final public class Incoming extends IncomingBase implements Ice.Request _connection.sendNoResponse(); } + if(_observer != null) + { + _observer.detach(); + _observer = null; + } _connection = null; } diff --git a/java/src/IceInternal/IncomingAsync.java b/java/src/IceInternal/IncomingAsync.java index ceb558b0e3b..2a23734ea08 100644 --- a/java/src/IceInternal/IncomingAsync.java +++ b/java/src/IceInternal/IncomingAsync.java @@ -121,6 +121,11 @@ public class IncomingAsync extends IncomingBase implements Ice.AMDCallback _connection.sendNoResponse(); } + if(_observer != null) + { + _observer.detach(); + _observer = null; + } _connection = null; } catch(Ice.LocalException ex) diff --git a/java/src/IceInternal/IncomingBase.java b/java/src/IceInternal/IncomingBase.java index f3d89f3b71d..6d1ec52dde6 100644 --- a/java/src/IceInternal/IncomingBase.java +++ b/java/src/IceInternal/IncomingBase.java @@ -62,6 +62,9 @@ public class IncomingBase _instance = other._instance; //other._instance = null; // Don't reset _instance. + _observer = other._observer; + other._observer = null; + _servant = other._servant; other._servant = null; @@ -155,6 +158,10 @@ public class IncomingBase public void __writeUserException(Ice.UserException ex, Ice.FormatType format) { + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } BasicStream __os = __startWriteParams(format); __os.writeUserException(ex); __endWriteParams(false); @@ -209,6 +216,8 @@ public class IncomingBase _cookie.value = null; } + _observer = null; + if(_os != null) { _os.reset(); @@ -259,6 +268,11 @@ public class IncomingBase { assert(_connection != null); + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + // // The operation may have already marshaled a reply; we must overwrite that reply. // @@ -276,6 +290,11 @@ public class IncomingBase _connection.sendNoResponse(); } + if(_observer != null) + { + _observer.detach(); + _observer = null; + } _connection = null; } catch(java.lang.Exception ex) @@ -315,6 +334,11 @@ public class IncomingBase { __warning(ex); } + + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } if(_response) { @@ -366,6 +390,11 @@ public class IncomingBase __warning(ex); } + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + if(_response) { _os.resize(Protocol.headerSize + 4, false); // Reply status position. @@ -385,6 +414,11 @@ public class IncomingBase __warning(ex); } + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + if(_response) { _os.resize(Protocol.headerSize + 4, false); // Reply status position. @@ -404,6 +438,11 @@ public class IncomingBase __warning(ex); } + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + if(_response) { _os.resize(Protocol.headerSize + 4, false); // Reply status position. @@ -423,6 +462,11 @@ public class IncomingBase __warning(ex); } + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + if(_response) { _os.resize(Protocol.headerSize + 4, false); // Reply status position. @@ -448,6 +492,11 @@ public class IncomingBase __warning(ex); } + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + if(_response) { _os.resize(Protocol.headerSize + 4, false); // Reply status position. @@ -473,6 +522,11 @@ public class IncomingBase __warning(ex); } + if(_observer != null) + { + _observer.failed(ex.getClass().getName()); + } + if(_response) { _os.resize(Protocol.headerSize + 4, false); // Reply status position. @@ -491,6 +545,11 @@ public class IncomingBase } } + if(_observer != null) + { + _observer.detach(); + _observer = null; + } _connection = null; } @@ -499,6 +558,7 @@ public class IncomingBase protected Ice.Object _servant; protected Ice.ServantLocator _locator; protected Ice.LocalObjectHolder _cookie; + protected Ice.Instrumentation.Observer _observer; protected boolean _response; protected byte _compress; diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index c1648e9c10c..5c5a5acaef2 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -29,6 +29,15 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice setState(StateClosed); } + public synchronized void + updateConnectionObservers() + { + for(Ice.ConnectionI connection : _connections) + { + connection.updateObserver(); + } + } + public void waitUntilHolding() { diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 3e9367fad00..55119014184 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -11,6 +11,36 @@ package IceInternal; public final class Instance { + private class ObserverUpdaterI implements Ice.Instrumentation.ObserverUpdater + { + ObserverUpdaterI(Instance instance) + { + _instance = instance; + } + + @Override public void + updateConnectionObservers() + { + _instance.outgoingConnectionFactory().updateConnectionObservers(); + _instance.objectAdapterFactory().updateConnectionObservers(); + } + + @Override public void + updateThreadObservers() + { + _instance.clientThreadPool().updateObservers(); + ThreadPool serverThreadPool = _instance.serverThreadPool(false); + if(serverThreadPool != null) + { + serverThreadPool.updateObservers(); + } + _instance.objectAdapterFactory().updateThreadObservers(); + _instance.endpointHostResolver().updateObserver(); + } + + final private Instance _instance; + } + public Ice.InitializationData initializationData() { @@ -159,14 +189,14 @@ public final class Instance } public synchronized ThreadPool - serverThreadPool() + serverThreadPool(boolean create) { if(_state == StateDestroyed) { throw new Ice.CommunicatorDestroyedException(); } - if(_serverThreadPool == null) // Lazy initialization. + if(_serverThreadPool == null && create) // Lazy initialization. { int timeout = _initData.properties.getPropertyAsInt("Ice.ServerIdleTime"); _serverThreadPool = new ThreadPool(this, "Ice.ThreadPool.Server", timeout); @@ -726,9 +756,30 @@ public final class Instance _adminFacetFilter.addAll(java.util.Arrays.asList(facetFilter)); } - _adminFacets.put("Properties", new PropertiesAdminI("Properties", _initData.properties, _initData.logger)); _adminFacets.put("Process", new ProcessI(communicator)); + MetricsAdminI admin = new MetricsAdminI(_initData.properties, _initData.logger); + _adminFacets.put("MetricsAdmin", admin); + + PropertiesAdminI props = new PropertiesAdminI("Properties", _initData.properties, _initData.logger); + _adminFacets.put("Properties", props); + + // + // Setup the communicator observer only if the user didn't already set an + // Ice observer resolver and if the admininistrative endpoints are set. + // + if(_initData.observer == null && + (_adminFacetFilter.isEmpty() || _adminFacetFilter.contains("MetricsAdmin")) && + _initData.properties.getProperty("Ice.Admin.Endpoints").length() > 0) + { + IceMX.CommunicatorObserverI observer = new IceMX.CommunicatorObserverI(admin); + _initData.observer = observer; + + // + // Make sure the admin plugin receives property updates. + // + props.addUpdateCallback(admin); + } } catch(Ice.LocalException ex) { @@ -780,6 +831,14 @@ public final class Instance pluginManagerImpl.loadPlugins(args); // + // Set observer updater + // + if(_initData.observer != null) + { + _initData.observer.setObserverUpdater(new ObserverUpdaterI(this)); + } + + // // Create threads. // try diff --git a/java/src/IceInternal/MetricsAdminI.java b/java/src/IceInternal/MetricsAdminI.java new file mode 100644 index 00000000000..80df0eeccf5 --- /dev/null +++ b/java/src/IceInternal/MetricsAdminI.java @@ -0,0 +1,354 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class MetricsAdminI extends IceMX._MetricsAdminDisp implements Ice.PropertiesAdminUpdateCallback +{ + final static private String[] viewSuffixes = + { + "Disabled", + "GroupBy", + "Accept.*", + "Reject.*", + "RetainDetached", + "Map.*", + }; + + static void + validateProperties(String prefix, Ice.Properties properties, String[] suffixes) + { + java.util.Map<String, String> props = properties.getPropertiesForPrefix(prefix); + java.util.List<String> unknownProps = new java.util.ArrayList<String>(); + for(String prop : props.keySet()) + { + boolean valid = false; + for(String suffix : suffixes) + { + if(IceUtilInternal.StringUtil.match(prop, prefix + suffix, false)) + { + valid = true; + break; + } + } + + if(!valid) + { + unknownProps.add(prop); + } + } + + if(unknownProps.size() != 0 && properties.getPropertyAsIntWithDefault("Ice.Warn.UnknownProperties", 1) > 0) + { + StringBuffer message = new StringBuffer("found unknown IceMX properties for `"); + message.append(prefix.substring(0, prefix.length() - 1)); + message.append("':"); + for(String p : unknownProps) + { + message.append("\n "); + message.append(p); + } + Ice.Util.getProcessLogger().warning(message.toString()); + } + } + + static class MetricsMapFactory<T extends IceMX.Metrics> + { + public MetricsMapFactory(Runnable updater, Class<T> cl) + { + _updater = updater; + _class = cl; + } + + public void + update() + { + assert(_updater != null); + _updater.run(); + } + + public MetricsMap<T> + create(String mapPrefix, Ice.Properties properties) + { + return new MetricsMap<T>(mapPrefix, _class, properties, _subMaps); + } + + public <S extends IceMX.Metrics> void + registerSubMap(String subMap, Class<S> cl, java.lang.reflect.Field field) + { + _subMaps.put(subMap, new MetricsMap.SubMapFactory<S>(cl, field)); + } + + final private Runnable _updater; + final private Class<T> _class; + final private java.util.Map<String, MetricsMap.SubMapFactory<?>> _subMaps = + new java.util.HashMap<String, MetricsMap.SubMapFactory<?>>(); + }; + + public MetricsAdminI(Ice.Properties properties, Ice.Logger logger) + { + _logger = logger; + _properties = properties; + updateViews(); + } + + public void updateViews() + { + java.util.Set<MetricsMapFactory> updatedMaps = new java.util.HashSet<MetricsMapFactory>(); + synchronized(this) + { + String viewsPrefix = "IceMX.Metrics."; + java.util.Map<String, String> viewsProps = _properties.getPropertiesForPrefix(viewsPrefix); + java.util.Map<String, MetricsViewI> views = new java.util.HashMap<String, MetricsViewI>(); + for(java.util.Map.Entry<String, String> e : viewsProps.entrySet()) + { + String viewName = e.getKey().substring(viewsPrefix.length()); + int dotPos = viewName.indexOf('.'); + if(dotPos > 0) + { + viewName = viewName.substring(0, dotPos); + } + + if(views.containsKey(viewName)) + { + continue; // View already configured. + } + + validateProperties(viewsPrefix + viewName + ".", _properties, viewSuffixes); + + if(_properties.getPropertyAsIntWithDefault(viewsPrefix + viewName + ".Disabled", 0) > 0) + { + continue; // The view is disabled + } + + // + // Create the view or update it. + // + MetricsViewI v = _views.get(viewName); + if(v == null) + { + v = new MetricsViewI(viewName); + } + views.put(viewName, v); + + for(java.util.Map.Entry<String, MetricsMapFactory<?>> f : _factories.entrySet()) + { + if(v.addOrUpdateMap(_properties, f.getKey(), f.getValue(), _logger)) + { + updatedMaps.add(f.getValue()); + } + } + } + java.util.Map<String, MetricsViewI> tmp = _views; + _views = views; + views = tmp; + + // + // Go through removed views to collect maps to update. + // + for(java.util.Map.Entry<String, MetricsViewI> v : views.entrySet()) + { + if(!_views.containsKey(v.getKey())) + { + for(String n : v.getValue().getMaps()) + { + updatedMaps.add(_factories.get(n)); + } + } + } + } + + // + // Call the updaters to update the maps. + // + for(MetricsMapFactory f : updatedMaps) + { + f.update(); + } + } + + synchronized public String[] + getMetricsViewNames(Ice.Current current) + { + return _views.keySet().toArray(new String[0]); + } + + synchronized public java.util.Map<String, IceMX.Metrics[]> + getMetricsView(String viewName, Ice.Current current) + throws IceMX.UnknownMetricsView + { + MetricsViewI view = _views.get(viewName); + if(view == null) + { + throw new IceMX.UnknownMetricsView(); + } + return view.getMetrics(); + } + + synchronized public IceMX.MetricsFailures[] + getMapMetricsFailures(String viewName, String mapName, Ice.Current current) + throws IceMX.UnknownMetricsView + { + MetricsViewI view = _views.get(viewName); + if(view == null) + { + throw new IceMX.UnknownMetricsView(); + } + return view.getFailures(mapName); + } + + synchronized public IceMX.MetricsFailures + getMetricsFailures(String viewName, String mapName, String id, Ice.Current current) + throws IceMX.UnknownMetricsView + { + MetricsViewI view = _views.get(viewName); + if(view == null) + { + throw new IceMX.UnknownMetricsView(); + } + return view.getFailures(mapName, id); + } + + public <T extends IceMX.Metrics> void + registerMap(String map, Class<T> cl, Runnable updater) + { + boolean updated; + MetricsMapFactory<T> factory; + synchronized(this) + { + factory = new MetricsMapFactory<T>(updater, cl); + _factories.put(map, factory); + updated = addOrUpdateMap(map, factory); + } + if(updated) + { + factory.update(); + } + } + + synchronized public <S extends IceMX.Metrics> void + registerSubMap(String map, String subMap, Class<S> cl, java.lang.reflect.Field field) + { + boolean updated; + MetricsMapFactory<?> factory; + synchronized(this) + { + factory = _factories.get(map); + if(factory == null) + { + return; + } + factory.registerSubMap(subMap, cl, field); + removeMap(map); + updated = addOrUpdateMap(map, factory); + } + if(updated) + { + factory.update(); + } + } + + public void + unregisterMap(String mapName) + { + boolean updated; + MetricsMapFactory<?> factory; + synchronized(this) + { + factory = _factories.remove(mapName); + if(factory == null) + { + return; + } + updated = removeMap(mapName); + } + if(updated) + { + factory.update(); + } + } + + public <T extends IceMX.Metrics> java.util.List<MetricsMap<T>> + getMaps(String mapName, Class<T> cl) + { + java.util.List<MetricsMap<T>> maps = new java.util.ArrayList<MetricsMap<T>>(); + for(MetricsViewI v : _views.values()) + { + MetricsMap<T> map = v.getMap(mapName, cl); + if(map != null) + { + maps.add(map); + } + } + return maps; + } + + public Ice.Logger + getLogger() + { + return _logger; + } + + public void + setProperties(Ice.Properties properties) + { + _properties = properties; + } + + public void + updated(java.util.Map<String, String> props) + { + for(java.util.Map.Entry<String, String> e : props.entrySet()) + { + if(e.getKey().indexOf("IceMX.") == 0) + { + // Udpate the metrics views using the new configuration. + try + { + updateViews(); + } + catch(Exception ex) + { + _logger.warning("unexpected exception while updating metrics view configuration:\n" + + ex.toString()); + } + return; + } + } + } + + private boolean + addOrUpdateMap(String mapName, MetricsMapFactory factory) + { + boolean updated = false; + for(MetricsViewI v : _views.values()) + { + updated |= v.addOrUpdateMap(_properties, mapName, factory, _logger); + } + return updated; + } + + private boolean + removeMap(String mapName) + { + boolean updated = false; + for(MetricsViewI v : _views.values()) + { + updated |= v.removeMap(mapName); + } + return updated; + } + + private Ice.Properties _properties; + final private Ice.Logger _logger; + final private java.util.Map<String, MetricsMapFactory<?>> _factories = + new java.util.HashMap<String, MetricsMapFactory<?>>(); + + private java.util.Map<String, MetricsViewI> _views = new java.util.HashMap<String, MetricsViewI>(); +}
\ No newline at end of file diff --git a/java/src/IceInternal/MetricsMap.java b/java/src/IceInternal/MetricsMap.java new file mode 100644 index 00000000000..d1efd1d4bb7 --- /dev/null +++ b/java/src/IceInternal/MetricsMap.java @@ -0,0 +1,530 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class MetricsMap<T extends IceMX.Metrics> +{ + final private String[] mapSuffixes = + { + "GroupBy", + "Accept.*", + "Reject.*", + "RetainDetached", + }; + + public class Entry implements Comparable<Entry> + { + Entry(T obj) + { + _object = obj; + } + + synchronized public void + failed(String exceptionName) + { + ++_object.failures; + Integer count = _failures.get(exceptionName); + _failures.put(exceptionName, new Integer(count == null ? 1 : count + 1)); + } + + synchronized IceMX.MetricsFailures + getFailures() + { + if(_failures.isEmpty()) + { + return null; + } + IceMX.MetricsFailures f = new IceMX.MetricsFailures(); + f.id = _object.id; + f.failures = new java.util.HashMap<String, Integer>(_failures); + return f; + } + + @SuppressWarnings("unchecked") + public <S extends IceMX.Metrics> MetricsMap<S>.Entry + getMatching(String mapName, IceMX.MetricsHelper<S> helper, Class<S> cl) + { + SubMap<S> m; + synchronized(this) + { + m = _subMaps != null ? (SubMap<S>)_subMaps.get(mapName) : null; + if(m == null) + { + m = createSubMap(mapName, cl); + if(m == null) + { + return null; + } + if(_subMaps == null) + { + _subMaps = new java.util.HashMap<String, SubMap<?>>(); + } + _subMaps.put(mapName, m); + } + } + return m.getMatching(helper); + } + + synchronized public void + attach(IceMX.MetricsHelper<T> helper) + { + ++_object.total; + ++_object.current; + helper.initMetrics(_object); + } + + public void + detach(long lifetime) + { + synchronized(this) + { + _object.totalLifetime += lifetime; + if(--_object.current > 0) + { + return; + } + } + detached(this); + } + + synchronized public boolean + isDetached() + { + return _object.current == 0; + } + + synchronized public void + execute(IceMX.Observer.MetricsUpdate<T> func) + { + func.update(_object); + } + + @SuppressWarnings("unchecked") + synchronized public IceMX.Metrics + clone() + { + T metrics = (T)_object.clone(); + if(_subMaps != null) + { + for(SubMap s : _subMaps.values()) + { + s.addSubMapToMetrics(metrics); + } + } + return metrics; + } + + public int + compareTo(Entry e) + { + return _object.id.compareTo(e._object.id); + } + + private T _object; + private java.util.Map<String, Integer> _failures = new java.util.HashMap<String, Integer>(); + private java.util.Map<String, SubMap<?>> _subMaps; + }; + + static class SubMap<S extends IceMX.Metrics> + { + public + SubMap(MetricsMap<S> map, java.lang.reflect.Field field) + { + _map = map; + _field = field; + } + + public MetricsMap<S>.Entry + getMatching(IceMX.MetricsHelper<S> helper) + { + return _map.getMatching(helper); + } + + public void + addSubMapToMetrics(IceMX.Metrics metrics) + { + try + { + _field.set(metrics, _map.getMetrics()); + } + catch(Exception ex) + { + assert(false); + } + } + + final private MetricsMap<S> _map; + final private java.lang.reflect.Field _field; + }; + + static class SubMapCloneFactory<S extends IceMX.Metrics> + { + public SubMapCloneFactory(MetricsMap<S> map, java.lang.reflect.Field field) + { + _map = map; + _field = field; + } + + public SubMap<S> + create() + { + return new SubMap<S>(new MetricsMap<S>(_map), _field); + } + + final private MetricsMap<S> _map; + final private java.lang.reflect.Field _field; + }; + + static class SubMapFactory<S extends IceMX.Metrics> + { + SubMapFactory(Class<S> cl, java.lang.reflect.Field field) + { + _class = cl; + _field = field; + } + + SubMapCloneFactory<S> + createCloneFactory(String subMapPrefix, Ice.Properties properties) + { + return new SubMapCloneFactory<S>(new MetricsMap<S>(subMapPrefix, _class, properties, null), _field); + } + + final private Class<S> _class; + final private java.lang.reflect.Field _field; + }; + + MetricsMap(String mapPrefix, Class<T> cl, Ice.Properties props, java.util.Map<String, SubMapFactory<?>> subMaps) + { + _properties = props.getPropertiesForPrefix(mapPrefix); + + _retain = props.getPropertyAsIntWithDefault(mapPrefix + "RetainDetached", 10); + _accept = parseRule(props, mapPrefix + "Accept"); + _reject = parseRule(props, mapPrefix + "Reject"); + _groupByAttributes = new java.util.ArrayList<String>(); + _groupBySeparators = new java.util.ArrayList<String>(); + _class = cl; + + String groupBy = props.getPropertyWithDefault(mapPrefix + "GroupBy", "id"); + if(!groupBy.isEmpty()) + { + String v = ""; + boolean attribute = Character.isLetter(groupBy.charAt(0)) || Character.isDigit(groupBy.charAt(0)); + if(!attribute) + { + _groupByAttributes.add(""); + } + + for(char p : groupBy.toCharArray()) + { + boolean isAlphaNum = Character.isLetter(p) || Character.isDigit(p) || p == '.'; + if(attribute && !isAlphaNum) + { + _groupByAttributes.add(v); + v = "" + p; + attribute = false; + } + else if(!attribute && isAlphaNum) + { + _groupBySeparators.add(v); + v = "" + p; + attribute = true; + } + else + { + v += p; + } + } + + if(attribute) + { + _groupByAttributes.add(v); + } + else + { + _groupBySeparators.add(v); + } + } + + if(subMaps != null && !subMaps.isEmpty()) + { + _subMaps = new java.util.HashMap<String, SubMapCloneFactory<?>>(); + + java.util.List<String> subMapNames = new java.util.ArrayList<String>(); + for(java.util.Map.Entry<String, SubMapFactory<?>> e : subMaps.entrySet()) + { + subMapNames.add(e.getKey()); + String subMapsPrefix = mapPrefix + "Map."; + String subMapPrefix = subMapsPrefix + e.getKey() + '.'; + if(props.getPropertiesForPrefix(subMapPrefix).isEmpty()) + { + if(props.getPropertiesForPrefix(subMapsPrefix).isEmpty()) + { + subMapPrefix = mapPrefix; + } + else + { + continue; // This sub-map isn't configured. + } + } + + _subMaps.put(e.getKey(), e.getValue().createCloneFactory(subMapPrefix, props)); + } + validateProperties(mapPrefix, props, subMapNames); + } + else + { + _subMaps = null; + } + } + + MetricsMap(MetricsMap<T> map) + { + _properties = map._properties; + _groupByAttributes = map._groupByAttributes; + _groupBySeparators = map._groupBySeparators; + _retain = map._retain; + _accept = map._accept; + _reject = map._reject; + _class = map._class; + _subMaps = map._subMaps; + } + + private void + validateProperties(String prefix, Ice.Properties props, java.util.Collection<String> subMaps) + { + if(subMaps.isEmpty()) + { + MetricsAdminI.validateProperties(prefix, props, mapSuffixes); + return; + } + + java.util.List<String> suffixes = new java.util.ArrayList<String>(java.util.Arrays.asList(mapSuffixes)); + for(String s : subMaps) + { + String suffix = "Map." + s + "."; + MetricsAdminI.validateProperties(prefix + suffix, props, mapSuffixes); + suffixes.add(suffix + '*'); + } + MetricsAdminI.validateProperties(prefix, props, suffixes.toArray(new String[suffixes.size()])); + } + + java.util.Map<String, String> + getProperties() + { + return _properties; + } + + synchronized IceMX.Metrics[] + getMetrics() + { + IceMX.Metrics[] metrics = new IceMX.Metrics[_objects.size()]; + int i = 0; + for(Entry e : _objects.values()) + { + metrics[i++] = e.clone(); + } + return metrics; + } + + synchronized IceMX.MetricsFailures[] + getFailures() + { + java.util.List<IceMX.MetricsFailures> failures = new java.util.ArrayList<IceMX.MetricsFailures>(); + for(Entry e : _objects.values()) + { + IceMX.MetricsFailures f = e.getFailures(); + if(f != null) + { + failures.add(f); + } + } + return failures.toArray(new IceMX.MetricsFailures[failures.size()]); + } + + synchronized IceMX.MetricsFailures + getFailures(String id) + { + Entry e = _objects.get(id); + if(e != null) + { + return e.getFailures(); + } + return null; + } + + @SuppressWarnings("unchecked") + public <S extends IceMX.Metrics> SubMap<S> + createSubMap(String subMapName, Class<S> cl) + { + if(_subMaps == null) + { + return null; + } + SubMapCloneFactory<S> factory = (SubMapCloneFactory<S>)_subMaps.get(subMapName); + if(factory != null) + { + return factory.create(); + } + return null; + } + + public Entry + getMatching(IceMX.MetricsHelper<T> helper) + { + // + // Check the accept and reject filters. + // + for(java.util.Map.Entry<String, java.util.regex.Pattern> e : _accept.entrySet()) + { + if(!match(e.getKey(), e.getValue(), helper, false)) + { + return null; + } + } + + for(java.util.Map.Entry<String, java.util.regex.Pattern> e : _reject.entrySet()) + { + if(match(e.getKey(), e.getValue(), helper, false)) + { + return null; + } + } + + // + // Compute the key from the GroupBy property. + // + String key; + try + { + if(_groupByAttributes.size() == 1) + { + key = helper.resolve(_groupByAttributes.get(0)); + } + else + { + StringBuilder os = new StringBuilder(); + java.util.Iterator<String> q = _groupBySeparators.iterator(); + for(String p : _groupByAttributes) + { + os.append(helper.resolve(p)); + if(q.hasNext()) + { + os.append(q.next()); + } + } + key = os.toString(); + } + } + catch(Exception ex) + { + return null; + } + + // + // Lookup the metrics object. + // + synchronized(this) + { + Entry e = _objects.get(key); + if(e == null) + { + try + { + T t = _class.newInstance(); + t.id = key; + e = new Entry(t); + _objects.put(key, e); + } + catch(Exception ex) + { + assert(false); + } + } + return e; + } + } + + private void + detached(Entry entry) + { + if(_retain == 0) + { + return; + } + + synchronized(this) + { + if(_detachedQueue == null) + { + _detachedQueue = new java.util.LinkedList<Entry>(); + } + assert(_detachedQueue.size() <= _retain); + + // Compress the queue by removing entries which are no longer detached. + java.util.Iterator<Entry> p = _detachedQueue.iterator(); + while(p.hasNext()) + { + Entry e = p.next(); + if(e == entry || !e.isDetached()) + { + p.remove(); + } + } + + // If there's still no room, remove the oldest entry (at the front). + if(_detachedQueue.size() == _retain) + { + _objects.remove(_detachedQueue.pollFirst()._object.id); + } + + // Add the entry at the back of the queue. + _detachedQueue.add(entry); + } + } + + private java.util.Map<String, java.util.regex.Pattern> + parseRule(Ice.Properties properties, String name) + { + java.util.Map<String, java.util.regex.Pattern> pats = new java.util.HashMap<String, java.util.regex.Pattern>(); + java.util.Map<String, String> rules = properties.getPropertiesForPrefix(name + '.'); + for(java.util.Map.Entry<String,String> e : rules.entrySet()) + { + pats.put(e.getKey().substring(name.length() + 1), java.util.regex.Pattern.compile(e.getValue())); + } + return pats; + } + + private boolean + match(String attribute, java.util.regex.Pattern regex, IceMX.MetricsHelper<T> helper, boolean reject) + { + String value; + try + { + value = helper.resolve(attribute); + } + catch(Exception ex) + { + return !reject; + } + return regex.matcher(value).matches(); + } + + final private java.util.Map<String, String> _properties; + final private java.util.List<String> _groupByAttributes; + final private java.util.List<String> _groupBySeparators; + final private int _retain; + final private java.util.Map<String, java.util.regex.Pattern> _accept; + final private java.util.Map<String, java.util.regex.Pattern> _reject; + final private Class<T> _class; + + final private java.util.Map<String, Entry> _objects = new java.util.HashMap<String, Entry>(); + final private java.util.Map<String, SubMapCloneFactory<?>> _subMaps; + private java.util.Deque<Entry> _detachedQueue; +};
\ No newline at end of file diff --git a/java/src/IceInternal/MetricsViewI.java b/java/src/IceInternal/MetricsViewI.java new file mode 100644 index 00000000000..2eeed91d312 --- /dev/null +++ b/java/src/IceInternal/MetricsViewI.java @@ -0,0 +1,125 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class MetricsViewI +{ + MetricsViewI(String name) + { + _name = name; + } + + public boolean + addOrUpdateMap(Ice.Properties properties, String mapName, MetricsAdminI.MetricsMapFactory<?> factory, + Ice.Logger logger) + { + // + // Add maps to views configured with the given map. + // + String viewPrefix = "IceMX.Metrics." + _name + "."; + String mapsPrefix = viewPrefix + "Map."; + java.util.Map<String, String> mapsProps = properties.getPropertiesForPrefix(mapsPrefix); + + String mapPrefix; + java.util.Map<String, String> mapProps = new java.util.HashMap<String, String>(); + if(!mapsProps.isEmpty()) + { + mapPrefix = mapsPrefix + mapName + "."; + mapProps = properties.getPropertiesForPrefix(mapPrefix); + if(mapProps.isEmpty()) + { + // This map isn't configured for this view. + _maps.remove(mapName); + return true; + } + } + else + { + mapPrefix = viewPrefix; + mapProps = properties.getPropertiesForPrefix(mapPrefix); + } + + MetricsMap<?> m = _maps.get(mapName); + if(m != null && m.getProperties().equals(mapProps)) + { + return false; // The map configuration didn't change, no need to re-create. + } + + try + { + _maps.put(mapName, factory.create(mapPrefix, properties)); + } + catch(Exception ex) + { + logger.warning("unexpected exception while creating metrics map:\n" + ex); + _maps.remove(mapName); + } + return true; + } + + public boolean + removeMap(String mapName) + { + return _maps.remove(mapName) != null; + } + + public java.util.Map<String, IceMX.Metrics[]> + getMetrics() + { + java.util.Map<String, IceMX.Metrics[]> metrics = new java.util.HashMap<String, IceMX.Metrics[]>(); + for(java.util.Map.Entry<String, MetricsMap<?>> e : _maps.entrySet()) + { + IceMX.Metrics[] m = e.getValue().getMetrics(); + if(m != null) + { + metrics.put(e.getKey(), m); + } + } + return metrics; + } + + public IceMX.MetricsFailures[] + getFailures(String mapName) + { + MetricsMap<?> m = _maps.get(mapName); + if(m != null) + { + return m.getFailures(); + } + return null; + } + + public IceMX.MetricsFailures + getFailures(String mapName, String id) + { + MetricsMap<?> m = _maps.get(mapName); + if(m != null) + { + return m.getFailures(id); + } + return null; + } + + public java.util.Collection<String> + getMaps() + { + return _maps.keySet(); + } + + @SuppressWarnings("unchecked") + public <T extends IceMX.Metrics> MetricsMap<T> + getMap(String mapName, Class<T> cl) + { + return (MetricsMap<T>)_maps.get(mapName); + } + + final private String _name; + final private java.util.Map<String, MetricsMap<?>> _maps = new java.util.HashMap<String, MetricsMap<?>>(); +}; diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java index a402e03b4b5..0d24a641f0c 100644 --- a/java/src/IceInternal/ObjectAdapterFactory.java +++ b/java/src/IceInternal/ObjectAdapterFactory.java @@ -107,6 +107,36 @@ public final class ObjectAdapterFactory } } + public void + updateConnectionObservers() + { + java.util.List<Ice.ObjectAdapterI> adapters; + synchronized(this) + { + adapters = new java.util.LinkedList<Ice.ObjectAdapterI>(_adapters); + } + + for(Ice.ObjectAdapterI adapter : adapters) + { + adapter.updateConnectionObservers(); + } + } + + public void + updateThreadObservers() + { + java.util.List<Ice.ObjectAdapterI> adapters; + synchronized(this) + { + adapters = new java.util.LinkedList<Ice.ObjectAdapterI>(_adapters); + } + + for(Ice.ObjectAdapterI adapter : adapters) + { + adapter.updateThreadObservers(); + } + } + public synchronized Ice.ObjectAdapter createObjectAdapter(String name, Ice.RouterPrx router) { diff --git a/java/src/IceInternal/ObserverHelper.java b/java/src/IceInternal/ObserverHelper.java new file mode 100644 index 00000000000..754a57098bf --- /dev/null +++ b/java/src/IceInternal/ObserverHelper.java @@ -0,0 +1,65 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +import Ice.Instrumentation.CommunicatorObserver; +import Ice.Instrumentation.InvocationObserver; + +public final class ObserverHelper +{ + static public InvocationObserver + get(Instance instance, String op) + { + CommunicatorObserver obsv = instance.initializationData().observer; + if(obsv != null) + { + InvocationObserver observer = obsv.getInvocationObserver(null, op, _emptyContext); + if(observer != null) + { + observer.attach(); + } + return observer; + } + return null; + } + + static public InvocationObserver + get(Ice.ObjectPrx proxy, String op) + { + return get(proxy, op, null); + } + + static public InvocationObserver + get(Ice.ObjectPrx proxy, String op, java.util.Map<String, String> context) + { + CommunicatorObserver obsv = + ((Ice.ObjectPrxHelperBase)proxy).__reference().getInstance().initializationData().observer; + if(obsv != null) + { + InvocationObserver observer; + if(context == null) + { + observer = obsv.getInvocationObserver(proxy, op, _emptyContext); + } + else + { + observer = obsv.getInvocationObserver(proxy, op, context); + } + if(observer != null) + { + observer.attach(); + } + return observer; + } + return null; + } + + private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>(); +} diff --git a/java/src/IceInternal/OpaqueEndpointI.java b/java/src/IceInternal/OpaqueEndpointI.java index f40763a418a..5f0863251ac 100644 --- a/java/src/IceInternal/OpaqueEndpointI.java +++ b/java/src/IceInternal/OpaqueEndpointI.java @@ -14,7 +14,7 @@ final class OpaqueEndpointI extends EndpointI public OpaqueEndpointI(String str) { - super(Protocol_0_0, Encoding_0_0); + super(Protocol_0_0, Encoding_0_0, ""); _rawEncoding = Protocol.currentEncoding; @@ -148,7 +148,7 @@ final class OpaqueEndpointI extends EndpointI public OpaqueEndpointI(short type, BasicStream s) { - super(Protocol_0_0, Encoding_0_0); + super(Protocol_0_0, Encoding_0_0, ""); _type = type; _rawEncoding = s.startReadEncaps(); int sz = s.getReadEncapsSize(); diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index aa07f8aa052..1a047293295 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -9,15 +9,20 @@ package IceInternal; +import Ice.Instrumentation.Observer; +import Ice.Instrumentation.InvocationObserver; + public final class Outgoing implements OutgoingMessageCallback { public - Outgoing(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map<String, String> context) + Outgoing(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map<String, String> context, + InvocationObserver observer) throws LocalExceptionWrapper { _state = StateUnsent; _sent = false; _handler = handler; + _observer = observer; _encoding = handler.getReference().getEncoding(); _os = new BasicStream(_handler.getReference().getInstance(), Protocol.currentProtocolEncoding); @@ -28,7 +33,8 @@ public final class Outgoing implements OutgoingMessageCallback // These functions allow this object to be reused, rather than reallocated. // public void - reset(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map<String, String> context) + reset(RequestHandler handler, String operation, Ice.OperationMode mode, java.util.Map<String, String> context, + InvocationObserver observer) throws LocalExceptionWrapper { _state = StateUnsent; @@ -276,6 +282,12 @@ public final class Outgoing implements OutgoingMessageCallback // _sent = true; } + + if(_remoteObserver != null && _handler.getReference().getMode() != Reference.ModeTwoway) + { + _remoteObserver.detach(); + _remoteObserver = null; + } } public synchronized void @@ -285,6 +297,12 @@ public final class Outgoing implements OutgoingMessageCallback assert(_state <= StateInProgress); + if(_remoteObserver != null) + { + _remoteObserver.detach(); + _remoteObserver = null; + } + if(_is == null) { _is = new IceInternal.BasicStream(_handler.getReference().getInstance(), Protocol.currentProtocolEncoding); @@ -419,6 +437,11 @@ public final class Outgoing implements OutgoingMessageCallback finished(Ice.LocalException ex, boolean sent) { assert(_state <= StateInProgress); + if(_remoteObserver != null) + { + _remoteObserver.detach(); + _remoteObserver = null; + } _state = StateFailed; _exception = ex; _sent = sent; @@ -505,11 +528,28 @@ public final class Outgoing implements OutgoingMessageCallback } catch(Ice.UserException ex) { + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } _is.endReadEncaps(); throw ex; } } + public void + attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt) + { + if(_observer != null) + { + _remoteObserver = _observer.getRemoteObserver(info, endpt); + if(_remoteObserver != null) + { + _remoteObserver.attach(); + } + } + } + private void writeHeader(String operation, Ice.OperationMode mode, java.util.Map<String, String> context) throws LocalExceptionWrapper @@ -601,5 +641,8 @@ public final class Outgoing implements OutgoingMessageCallback private static final int StateFailed = 5; private int _state; + private InvocationObserver _observer; + private Observer _remoteObserver; + public Outgoing next; // For use by Ice._ObjectDelM } diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 2333c148c47..04fb8b0ebf3 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -31,6 +31,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa ctx = _emptyContext; } + _observer = ObserverHelper.get(_proxy, operation, ctx); + // // Can't call async via a batch proxy. // @@ -106,6 +108,11 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa { if(!_proxy.ice_isTwoway()) { + if(_remoteObserver != null) + { + _remoteObserver.detach(); + _remoteObserver = null; + } _state |= Done | OK; } else if(connection.timeout() > 0) @@ -131,6 +138,11 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa public void __sent() { __sentInternal(); + if(_observer != null && !_proxy.ice_isTwoway()) + { + _observer.detach(); + _observer = null; + } } public void __finished(Ice.LocalException exc, boolean sent) @@ -138,6 +150,11 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa synchronized(_monitor) { assert((_state & Done) == 0); + if(_remoteObserver != null) + { + _remoteObserver.detach(); + _remoteObserver = null; + } if(_timerTaskConnection != null) { _instance.timer().cancel(_timerTask); @@ -176,7 +193,13 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa // calling on the callback. The LocalExceptionWrapper exception is only called // before the invocation is sent. // - + + if(_remoteObserver != null) + { + _remoteObserver.detach(); + _remoteObserver = null; + } + try { int interval = handleException(exc); // This will throw if the invocation can't be retried. @@ -205,6 +228,11 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa synchronized(_monitor) { assert(_exception == null && (_state & Done) == 0); + if(_remoteObserver != null) + { + _remoteObserver.detach(); + _remoteObserver = null; + } if(_timerTaskConnection != null) { @@ -461,16 +489,16 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa { if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) { - _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt); + _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt, _observer); } else { - _proxy.__handleExceptionWrapper(_delegate, ex); + _proxy.__handleExceptionWrapper(_delegate, ex, _observer); } } catch(Ice.LocalException ex) { - _cnt = _proxy.__handleException(_delegate, ex, interval, _cnt); + _cnt = _proxy.__handleException(_delegate, ex, interval, _cnt, _observer); } return interval.value; } @@ -480,11 +508,11 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa Ice.IntHolder interval = new Ice.IntHolder(0); if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent) { - _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt); + _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, interval, _cnt, _observer); } else { - _proxy.__handleExceptionWrapper(_delegate, ex); + _proxy.__handleExceptionWrapper(_delegate, ex, _observer); } return interval.value; } diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 1e6fb700909..491b15a44d2 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -68,6 +68,18 @@ public final class OutgoingConnectionFactory notifyAll(); } + public synchronized void + updateConnectionObservers() + { + for(java.util.List<Ice.ConnectionI> connectionList : _connections.values()) + { + for(Ice.ConnectionI connection : connectionList) + { + connection.updateObserver(); + } + } + } + public void waitUntilFinished() { @@ -214,16 +226,33 @@ public final class OutgoingConnectionFactory // Try to establish the connection to the connectors. // DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; java.util.Iterator<ConnectorInfo> q = connectors.iterator(); ConnectorInfo ci = null; while(q.hasNext()) { ci = q.next(); + + Ice.Instrumentation.Observer observer = null; + if(obsv != null) + { + observer = obsv.getConnectionEstablishmentObserver(ci.endpoint, ci.connector.toString()); + if(observer != null) + { + observer.attach(); + } + } + try { connection = createConnection(ci.connector.connect(), ci); connection.start(null); + if(observer != null) + { + observer.detach(); + } + if(defaultsAndOverrides.overrideCompress) { compress.value = defaultsAndOverrides.overrideCompressValue; @@ -237,6 +266,11 @@ public final class OutgoingConnectionFactory } catch(Ice.CommunicatorDestroyedException ex) { + if(observer != null) + { + observer.failed(ex.ice_name()); + observer.detach(); + } exception = ex; handleConnectionException(exception, hasMore || p.hasNext()); connection = null; @@ -244,6 +278,11 @@ public final class OutgoingConnectionFactory } catch(Ice.LocalException ex) { + if(observer != null) + { + observer.failed(ex.ice_name()); + observer.detach(); + } exception = ex; handleConnectionException(exception, hasMore || p.hasNext()); connection = null; @@ -979,6 +1018,10 @@ public final class OutgoingConnectionFactory public void connectionStartCompleted(Ice.ConnectionI connection) { + if(_observer != null) + { + _observer.detach(); + } connection.activate(); _factory.finishGetConnection(_connectors, _current, connection, this); } @@ -988,6 +1031,12 @@ public final class OutgoingConnectionFactory { assert(_current != null); + if(_observer != null) + { + _observer.failed(ex.ice_name()); + _observer.detach(); + } + _factory.handleConnectionException(ex, _hasMore || _iter.hasNext()); if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue. { @@ -1180,6 +1229,18 @@ public final class OutgoingConnectionFactory { assert(_iter.hasNext()); _current = _iter.next(); + + Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.initializationData().observer; + if(obsv != null) + { + _observer = obsv.getConnectionEstablishmentObserver(_current.endpoint, + _current.connector.toString()); + if(_observer != null) + { + _observer.attach(); + } + } + connection = _factory.createConnection(_current.connector.connect(), _current); connection.start(this); } @@ -1199,6 +1260,7 @@ public final class OutgoingConnectionFactory private java.util.List<ConnectorInfo> _connectors = new java.util.ArrayList<ConnectorInfo>(); private java.util.Iterator<ConnectorInfo> _iter; private ConnectorInfo _current; + private Ice.Instrumentation.Observer _observer; } private Ice.Communicator _communicator; diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java index 21ff5b9da1c..9d325d21d44 100644 --- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java +++ b/java/src/IceInternal/ProxyBatchOutgoingAsync.java @@ -16,6 +16,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase)prx).__reference().getInstance(), operation, callback); _proxy = prx; + _observer = ObserverHelper.get(prx, operation); } public void __send() @@ -41,7 +42,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync } catch(Ice.LocalException __ex) { - cnt = ((Ice.ObjectPrxHelperBase)_proxy).__handleException(delegate, __ex, null, cnt); + cnt = ((Ice.ObjectPrxHelperBase)_proxy).__handleException(delegate, __ex, null, cnt, _observer); } } diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java index 247b2d86f77..c0c6cacbed6 100644 --- a/java/src/IceInternal/RequestHandler.java +++ b/java/src/IceInternal/RequestHandler.java @@ -9,6 +9,8 @@ package IceInternal; +import Ice.Instrumentation.InvocationObserver; + public interface RequestHandler { void prepareBatchRequest(BasicStream out) @@ -29,7 +31,8 @@ public interface RequestHandler Ice.ConnectionI getConnection(boolean wait); - Outgoing getOutgoing(String operation, Ice.OperationMode mode, java.util.Map<String, String> context) + Outgoing getOutgoing(String operation, Ice.OperationMode mode, java.util.Map<String, String> context, + InvocationObserver observer) throws LocalExceptionWrapper; void reclaimOutgoing(Outgoing out); diff --git a/java/src/IceInternal/TcpEndpointI.java b/java/src/IceInternal/TcpEndpointI.java index 18426c4494d..8e5f6d1b5d0 100644 --- a/java/src/IceInternal/TcpEndpointI.java +++ b/java/src/IceInternal/TcpEndpointI.java @@ -15,12 +15,11 @@ final class TcpEndpointI extends EndpointI TcpEndpointI(Instance instance, String ho, int po, int ti, Ice.ProtocolVersion pv, Ice.EncodingVersion ev, String conId, boolean co) { - super(pv, ev); + super(pv, ev, conId); _instance = instance; _host = ho; _port = po; _timeout = ti; - _connectionId = conId; _compress = co; calcHashValue(); } @@ -28,7 +27,7 @@ final class TcpEndpointI extends EndpointI public TcpEndpointI(Instance instance, String str, boolean oaEndpoint) { - super(Protocol.currentProtocol, instance.defaultsAndOverrides().defaultEncoding); + super(Protocol.currentProtocol, instance.defaultsAndOverrides().defaultEncoding, ""); _instance = instance; _host = null; _port = 0; @@ -172,7 +171,7 @@ final class TcpEndpointI extends EndpointI public TcpEndpointI(BasicStream s) { - super(new Ice.ProtocolVersion(), new Ice.EncodingVersion()); + super(new Ice.ProtocolVersion(), new Ice.EncodingVersion(), ""); _instance = s.instance(); s.startReadEncaps(); _host = s.readString(); @@ -525,11 +524,6 @@ final class TcpEndpointI extends EndpointI return 1; } - if(!_connectionId.equals(p._connectionId)) - { - return _connectionId.compareTo(p._connectionId); - } - if(!_compress && p._compress) { return -1; @@ -572,7 +566,6 @@ final class TcpEndpointI extends EndpointI private String _host; private int _port; private int _timeout; - private String _connectionId = ""; private boolean _compress; private int _hashCode; } diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index 93349213e84..77a67e592ee 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -225,20 +225,17 @@ final class TcpTransceiver implements Transceiver public Ice.ConnectionInfo getInfo() { - assert(_fd != null); Ice.TCPConnectionInfo info = new Ice.TCPConnectionInfo(); - java.net.Socket socket = _fd.socket(); - info.localAddress = socket.getLocalAddress().getHostAddress(); - info.localPort = socket.getLocalPort(); - if(socket.getInetAddress() != null) - { - info.remoteAddress = socket.getInetAddress().getHostAddress(); - info.remotePort = socket.getPort(); - } - else + if(_fd != null) { - info.remoteAddress = ""; - info.remotePort = -1; + java.net.Socket socket = _fd.socket(); + info.localAddress = socket.getLocalAddress().getHostAddress(); + info.localPort = socket.getLocalPort(); + if(socket.getInetAddress() != null) + { + info.remoteAddress = socket.getInetAddress().getHostAddress(); + info.remotePort = socket.getPort(); + } } return info; } diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index e2c6cc5cd78..e3eecdd817a 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -232,6 +232,15 @@ public final class ThreadPool } public synchronized void + updateObservers() + { + for(EventHandlerThread thread : _threads) + { + thread.updateObserver(); + } + } + + public synchronized void initialize(EventHandler handler) { assert(!_destroyed); @@ -301,12 +310,13 @@ public final class ThreadPool private void run(EventHandlerThread thread) { - ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this); + ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this, thread); boolean select = false; while(true) { if(current._handler != null) { + thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO); try { current._handler.message(current); @@ -324,6 +334,7 @@ public final class ThreadPool } else if(select) { + thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle); try { _selector.select(_serverIdleTime); @@ -351,7 +362,7 @@ public final class ThreadPool _nextHandler = _handlers.iterator(); select = false; } - else if(!current._leader && followerWait(thread, current)) + else if(!current._leader && followerWait(current)) { return; // Wait timed-out. } @@ -382,7 +393,7 @@ public final class ThreadPool --_inUse; } - if(!current._leader && followerWait(thread, current)) + if(!current._leader && followerWait(current)) { return; // Wait timed-out. } @@ -446,6 +457,8 @@ public final class ThreadPool { current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called. + current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser); + if(_sizeMax > 1) { synchronized(this) @@ -539,10 +552,12 @@ public final class ThreadPool } private synchronized boolean - followerWait(EventHandlerThread thread, ThreadPoolCurrent current) + followerWait(ThreadPoolCurrent current) { assert(!current._leader); + current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle); + // // It's important to clear the handler before waiting to make sure that // resources for the handler are released now if it's finished. We also @@ -573,8 +588,8 @@ public final class ThreadPool _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); } assert(_threads.size() > 1); // Can only be called by a waiting follower thread. - _threads.remove(thread); - _workQueue.queue(new JoinThreadWorkItem(thread)); + _threads.remove(current._thread); + _workQueue.queue(new JoinThreadWorkItem(current._thread)); return true; } } @@ -600,11 +615,40 @@ public final class ThreadPool private final String _threadPrefix; private final Selector _selector; - private final class EventHandlerThread implements Runnable + final class EventHandlerThread implements Runnable { EventHandlerThread(String name) { _name = name; + _state = Ice.Instrumentation.ThreadState.ThreadStateIdle; + updateObserver(); + } + + public void + updateObserver() + { + Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + if(obsv != null) + { + _observer = obsv.getThreadObserver(_prefix, _name, _state, _observer); + if(_observer != null) + { + _observer.attach(); + } + } + } + + void + setState(Ice.Instrumentation.ThreadState s) + { + if(_observer != null) + { + if(_state != s) + { + _observer.stateChanged(_state, s); + } + } + _state = s; } public void @@ -658,6 +702,11 @@ public final class ThreadPool _instance.initializationData().logger.error(s); } + if(_observer != null) + { + _observer.detach(); + } + if(_instance.initializationData().threadHook != null) { try @@ -675,6 +724,8 @@ public final class ThreadPool final private String _name; private Thread _thread; + private Ice.Instrumentation.ThreadState _state; + private Ice.Instrumentation.ThreadObserver _observer; } private final int _size; // Number of threads that are pre-created. diff --git a/java/src/IceInternal/ThreadPoolCurrent.java b/java/src/IceInternal/ThreadPoolCurrent.java index 02d471b4af2..93440e73e91 100644 --- a/java/src/IceInternal/ThreadPoolCurrent.java +++ b/java/src/IceInternal/ThreadPoolCurrent.java @@ -11,12 +11,13 @@ package IceInternal; public final class ThreadPoolCurrent { - ThreadPoolCurrent(Instance instance, ThreadPool threadPool) + ThreadPoolCurrent(Instance instance, ThreadPool threadPool, ThreadPool.EventHandlerThread thread) { operation = SocketOperation.None; stream = new BasicStream(instance, Protocol.currentProtocolEncoding); _threadPool = threadPool; + _thread = thread; _ioCompleted = false; _leader = false; } @@ -31,6 +32,7 @@ public final class ThreadPoolCurrent } final ThreadPool _threadPool; + final ThreadPool.EventHandlerThread _thread; EventHandler _handler; boolean _ioCompleted; boolean _leader; diff --git a/java/src/IceInternal/UdpEndpointI.java b/java/src/IceInternal/UdpEndpointI.java index 8e78a33079c..15d43429e62 100644 --- a/java/src/IceInternal/UdpEndpointI.java +++ b/java/src/IceInternal/UdpEndpointI.java @@ -15,14 +15,13 @@ final class UdpEndpointI extends EndpointI UdpEndpointI(Instance instance, String ho, int po, String mif, int mttl, Ice.ProtocolVersion p, Ice.EncodingVersion e, boolean conn, String conId, boolean co) { - super(p, e); + super(p, e, conId); _instance = instance; _host = ho; _port = po; _mcastInterface = mif; _mcastTtl = mttl; _connect = conn; - _connectionId = conId; _compress = co; calcHashValue(); } @@ -30,7 +29,7 @@ final class UdpEndpointI extends EndpointI public UdpEndpointI(Instance instance, String str, boolean oaEndpoint) { - super(Protocol.currentProtocol, instance.defaultsAndOverrides().defaultEncoding); + super(Protocol.currentProtocol, instance.defaultsAndOverrides().defaultEncoding, ""); _instance = instance; _host = null; _port = 0; @@ -188,7 +187,7 @@ final class UdpEndpointI extends EndpointI public UdpEndpointI(BasicStream s) { - super(new Ice.ProtocolVersion(), new Ice.EncodingVersion()); + super(new Ice.ProtocolVersion(), new Ice.EncodingVersion(), ""); _instance = s.instance(); s.startReadEncaps(); _host = s.readString(); @@ -536,11 +535,6 @@ final class UdpEndpointI extends EndpointI return 1; } - if(!_connectionId.equals(p._connectionId)) - { - return _connectionId.compareTo(p._connectionId); - } - if(!_compress && p._compress) { return -1; @@ -603,7 +597,6 @@ final class UdpEndpointI extends EndpointI private String _mcastInterface = ""; private int _mcastTtl = -1; private boolean _connect; - private String _connectionId = ""; private boolean _compress; private int _hashCode; } diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index 1d72e954f21..260bdd50774 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -228,36 +228,27 @@ final class UdpTransceiver implements Transceiver public Ice.ConnectionInfo getInfo() { - assert(_fd != null); - Ice.UDPConnectionInfo info = new Ice.UDPConnectionInfo(); - java.net.DatagramSocket socket = _fd.socket(); - info.localAddress = socket.getLocalAddress().getHostAddress(); - info.localPort = socket.getLocalPort(); - if(_state == StateNotConnected) + if(_fd != null) { - if(_peerAddr != null) - { - info.remoteAddress = _peerAddr.getAddress().getHostAddress(); - info.remotePort = _peerAddr.getPort(); - } - else + java.net.DatagramSocket socket = _fd.socket(); + info.localAddress = socket.getLocalAddress().getHostAddress(); + info.localPort = socket.getLocalPort(); + if(_state == StateNotConnected) { - info.remoteAddress = ""; - info.remotePort = -1; - } - } - else - { - if(socket.getInetAddress() != null) - { - info.remoteAddress = socket.getInetAddress().getHostAddress(); - info.remotePort = socket.getPort(); + if(_peerAddr != null) + { + info.remoteAddress = _peerAddr.getAddress().getHostAddress(); + info.remotePort = _peerAddr.getPort(); + } } else { - info.remoteAddress = ""; - info.remotePort = -1; + if(socket.getInetAddress() != null) + { + info.remoteAddress = socket.getInetAddress().getHostAddress(); + info.remotePort = socket.getPort(); + } } } if(_mcastAddr != null) @@ -265,11 +256,6 @@ final class UdpTransceiver implements Transceiver info.mcastAddress = _mcastAddr.getAddress().getHostAddress(); info.mcastPort = _mcastAddr.getPort(); } - else - { - info.mcastAddress = ""; - info.mcastPort = -1; - } return info; } diff --git a/java/src/IceMX/CommunicatorObserverI.java b/java/src/IceMX/CommunicatorObserverI.java new file mode 100644 index 00000000000..db1d83f0c90 --- /dev/null +++ b/java/src/IceMX/CommunicatorObserverI.java @@ -0,0 +1,674 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorObserver +{ + static void + addEndpointAttributes(MetricsHelper.AttributeResolver r, Class<?> cl) + throws Exception + { + Class<?> cli = Ice.EndpointInfo.class; + r.add("endpointType", cl.getDeclaredMethod("getEndpointInfo"), cli.getDeclaredMethod("type")); + r.add("endpointIsDatagram", cl.getDeclaredMethod("getEndpointInfo"), cli.getDeclaredMethod("datagram")); + r.add("endpointIsSecure", cl.getDeclaredMethod("getEndpointInfo"), cli.getDeclaredMethod("secure")); + r.add("endpointProtocolVersion", cl.getDeclaredMethod("getEndpointInfo"), cli.getDeclaredField("protocol")); + r.add("endpointEncodingVersion", cl.getDeclaredMethod("getEndpointInfo"), cli.getDeclaredField("encoding")); + r.add("endpointTimeout", cl.getDeclaredMethod("getEndpointInfo"), cli.getDeclaredField("timeout")); + r.add("endpointCompress", cl.getDeclaredMethod("getEndpointInfo"), cli.getDeclaredField("compress")); + + cli = Ice.IPEndpointInfo.class; + r.add("endpointHost", cl.getDeclaredMethod("getEndpointInfo"), cli.getDeclaredField("host")); + r.add("endpointPort", cl.getDeclaredMethod("getEndpointInfo"), cli.getDeclaredField("port")); + } + + static void + addConnectionAttributes(MetricsHelper.AttributeResolver r, Class<?> cl) + throws Exception + { + Class<?> cli = Ice.ConnectionInfo.class; + r.add("incoming", cl.getDeclaredMethod("getConnectionInfo"), cli.getDeclaredField("incoming")); + r.add("adapterName", cl.getDeclaredMethod("getConnectionInfo"), cli.getDeclaredField("adapterName")); + r.add("connectionId", cl.getDeclaredMethod("getConnectionInfo"), cli.getDeclaredField("connectionId")); + + cli = Ice.IPConnectionInfo.class; + r.add("localHost", cl.getDeclaredMethod("getConnectionInfo"), cli.getDeclaredField("localAddress")); + r.add("localPort", cl.getDeclaredMethod("getConnectionInfo"), cli.getDeclaredField("localPort")); + r.add("remoteHost", cl.getDeclaredMethod("getConnectionInfo"), cli.getDeclaredField("remoteAddress")); + r.add("remotePort", cl.getDeclaredMethod("getConnectionInfo"), cli.getDeclaredField("remotePort")); + + cli = Ice.UDPConnectionInfo.class; + r.add("mcastHost", cl.getDeclaredMethod("getConnectionInfo"), cli.getDeclaredField("mcastAddress")); + r.add("mcastPort", cl.getDeclaredMethod("getConnectionInfo"), cli.getDeclaredField("mcastPort")); + + addEndpointAttributes(r, cl); + } + + static private class ConnectionHelper extends MetricsHelper<ConnectionMetrics> + { + static private AttributeResolver _attributes = new AttributeResolver() + { + { + try + { + add("parent", ConnectionHelper.class.getDeclaredMethod("getParent")); + add("id", ConnectionHelper.class.getDeclaredMethod("getId")); + add("endpoint", ConnectionHelper.class.getDeclaredMethod("getEndpoint")); + add("state", ConnectionHelper.class.getDeclaredMethod("getState")); + addConnectionAttributes(this, ConnectionHelper.class); + } + catch(Exception ex) + { + ex.printStackTrace(); + assert(false); + } + } + }; + + ConnectionHelper(Ice.ConnectionInfo con, Ice.Endpoint endpt, Ice.Instrumentation.ConnectionState state) + { + super(_attributes); + _connectionInfo = con; + _endpoint = endpt; + _state = state; + } + + String + getId() + { + if(_id == null) + { + StringBuilder os = new StringBuilder(); + if(_connectionInfo instanceof Ice.IPConnectionInfo) + { + Ice.IPConnectionInfo info = (Ice.IPConnectionInfo)_connectionInfo; + os.append(info.localAddress).append(':').append(info.localPort); + os.append(" -> "); + os.append(info.remoteAddress).append(':').append(info.remotePort); + } + else + { + os.append("connection-").append(_connectionInfo); + } + if(!_connectionInfo.connectionId.isEmpty()) + { + os.append(" [").append(_connectionInfo.connectionId).append("]"); + } + _id = os.toString(); + } + return _id; + } + + String + getState() + { + switch(_state) + { + case ConnectionStateValidating: + return "validating"; + case ConnectionStateHolding: + return "holding"; + case ConnectionStateActive: + return "active"; + case ConnectionStateClosing: + return "closing"; + case ConnectionStateClosed: + return "closed"; + default: + assert(false); + return ""; + } + } + + String + getParent() + { + if(_connectionInfo.adapterName != null && !_connectionInfo.adapterName.isEmpty()) + { + return _connectionInfo.adapterName; + } + else + { + return "Communicator"; + } + } + + Ice.ConnectionInfo + getConnectionInfo() + { + return _connectionInfo; + } + + Ice.Endpoint + getEndpoint() + { + return _endpoint; + } + + Ice.EndpointInfo + getEndpointInfo() + { + if(_endpointInfo == null) + { + _endpointInfo = _endpoint.getInfo(); + } + return _endpointInfo; + } + + private final Ice.ConnectionInfo _connectionInfo; + private final Ice.Endpoint _endpoint; + private final Ice.Instrumentation.ConnectionState _state; + private String _id; + private Ice.EndpointInfo _endpointInfo; + }; + + static private final class DispatchHelper extends MetricsHelper<Metrics> + { + static private final AttributeResolver _attributes = new AttributeResolver() + { + { + try + { + Class<?> cl = DispatchHelper.class; + add("parent", cl.getDeclaredMethod("getParent")); + add("id", cl.getDeclaredMethod("getId")); + add("endpoint", cl.getDeclaredMethod("getEndpoint")); + add("connection", cl.getDeclaredMethod("getConnection")); + + addConnectionAttributes(this, cl); + + Class<?> clc = Ice.Current.class; + add("operation", cl.getDeclaredMethod("getCurrent"), clc.getDeclaredField("operation")); + add("identity", cl.getDeclaredMethod("getIdentity")); + add("facet", cl.getDeclaredMethod("getCurrent"), clc.getDeclaredField("facet")); + add("mode", cl.getDeclaredMethod("getMode")); + } + catch(Exception ex) + { + ex.printStackTrace(); + assert(false); + } + } + }; + + DispatchHelper(Ice.Current current) + { + super(_attributes); + _current = current; + } + + protected String + defaultResolve(String attribute) + { + if(attribute.indexOf("context.", 0) == 0) + { + String v = _current.ctx.get(attribute.substring(8)); + if(v != null) + { + return v; + } + } + throw new IllegalArgumentException(attribute); + } + + String + getMode() + { + return _current.requestId == 0 ? "oneway" : "twoway"; + } + + String + getId() + { + if(_id == null) + { + StringBuilder os = new StringBuilder(); + if(_current.id.category != null && !_current.id.category.isEmpty()) + { + os.append(_current.id.category).append('/'); + } + os.append(_current.id.name).append(" [").append(_current.operation).append(']'); + _id = os.toString(); + } + return _id; + } + + String + getParent() + { + return _current.adapter.getName(); + } + + Ice.ConnectionInfo + getConnectionInfo() + { + return _current.con.getInfo(); + } + + Ice.Endpoint + getEndpoint() + { + return _current.con.getEndpoint(); + } + + Ice.Connection + getConnection() + { + return _current.con; + } + + Ice.EndpointInfo + getEndpointInfo() + { + if(_endpointInfo == null) + { + _endpointInfo = _current.con.getEndpoint().getInfo(); + } + return _endpointInfo; + } + + Ice.Current + getCurrent() + { + return _current; + } + + String + getIdentity() + { + return _current.adapter.getCommunicator().identityToString(_current.id); + } + + final private Ice.Current _current; + private String _id; + private Ice.EndpointInfo _endpointInfo; + }; + + static private final class InvocationHelper extends MetricsHelper<InvocationMetrics> + { + static private final AttributeResolver _attributes = new AttributeResolver() + { + { + try + { + Class<?> cl = InvocationHelper.class; + add("parent", cl.getDeclaredMethod("getParent")); + add("id", cl.getDeclaredMethod("getId")); + + add("operation", cl.getDeclaredMethod("getOperation")); + add("identity", cl.getDeclaredMethod("getIdentity")); + + Class<?> cli = Ice.ObjectPrx.class; + add("facet", cl.getDeclaredMethod("getProxy"), cli.getDeclaredMethod("ice_getFacet")); + add("encoding", cl.getDeclaredMethod("getProxy"), + cli.getDeclaredMethod("ice_getEncodingVersion")); + add("mode", cl.getDeclaredMethod("getMode")); + add("proxy", cl.getDeclaredMethod("getProxy")); + } + catch(Exception ex) + { + ex.printStackTrace(); + assert(false); + } + } + }; + + InvocationHelper(Ice.ObjectPrx proxy, String op, java.util.Map<String, String> ctx) + { + super(_attributes); + _proxy = proxy; + _operation = op; + _context = ctx; + } + + protected String + defaultResolve(String attribute) + { + if(attribute.indexOf("context.", 0) == 0) + { + String v = _context.get(attribute.substring(8)); + if(v != null) + { + return v; + } + } + throw new IllegalArgumentException(attribute); + } + + String + getMode() + { + if(_proxy == null) + { + throw new IllegalArgumentException("mode"); + } + + if(_proxy.ice_isTwoway()) + { + return "twoway"; + } + else if(_proxy.ice_isOneway()) + { + return "oneway"; + } + else if(_proxy.ice_isBatchOneway()) + { + return "batch-oneway"; + } + else if(_proxy.ice_isDatagram()) + { + return "datagram"; + } + else if(_proxy.ice_isBatchDatagram()) + { + return "batch-datagram"; + } + else + { + throw new IllegalArgumentException("mode"); + } + } + + String + getId() + { + if(_id == null) + { + if(_proxy != null) + { + StringBuilder os = new StringBuilder(); + try + { + os.append(_proxy).append(" [").append(_operation).append(']'); + } + catch(Ice.FixedProxyException ex) + { + os.append(_proxy.ice_getCommunicator().identityToString(_proxy.ice_getIdentity())); + os.append(" [").append(_operation).append(']'); + } + _id = os.toString(); + } + else + { + _id = _operation; + } + } + return _id; + } + + String + getParent() + { + return "Communicator"; + } + + Ice.ObjectPrx + getProxy() + { + return _proxy; + } + + + String + getIdentity() + { + if(_proxy != null) + { + return _proxy.ice_getCommunicator().identityToString(_proxy.ice_getIdentity()); + } + else + { + return ""; + } + } + + String + getOperation() + { + return _operation; + } + + final private Ice.ObjectPrx _proxy; + final private String _operation; + final private java.util.Map<String, String> _context; + private String _id; + }; + + static private final class ThreadHelper extends MetricsHelper<ThreadMetrics> + { + static private final AttributeResolver _attributes = new AttributeResolver() + { + { + try + { + add("parent", ThreadHelper.class.getDeclaredField("_parent")); + add("id", ThreadHelper.class.getDeclaredField("_id")); + } + catch(Exception ex) + { + assert(false); + } + } + }; + + ThreadHelper(String parent, String id, Ice.Instrumentation.ThreadState state) + { + super(_attributes); + _parent = parent; + _id = id; + _state = state; + } + + public void + initMetrics(ThreadMetrics v) + { + switch(_state) + { + case ThreadStateInUseForIO: + ++v.inUseForIO; + break; + case ThreadStateInUseForUser: + ++v.inUseForUser; + break; + case ThreadStateInUseForOther: + ++v.inUseForOther; + break; + default: + break; + } + } + + final String _parent; + final String _id; + final private Ice.Instrumentation.ThreadState _state; + }; + + static private final class EndpointHelper extends MetricsHelper<Metrics> + { + static private final AttributeResolver _attributes = new AttributeResolver() + { + { + try + { + add("parent", EndpointHelper.class.getDeclaredMethod("getParent")); + add("id", EndpointHelper.class.getDeclaredMethod("getId")); + add("endpoint", EndpointHelper.class.getDeclaredMethod("getEndpoint")); + addEndpointAttributes(this, EndpointHelper.class); + } + catch(Exception ex) + { + ex.printStackTrace(); + assert(false); + } + } + }; + + EndpointHelper(Ice.Endpoint endpt, String id) + { + super(_attributes); + _endpoint = endpt; + _id = id; + } + + EndpointHelper(Ice.Endpoint endpt) + { + super(_attributes); + _endpoint = endpt; + } + + Ice.EndpointInfo + getEndpointInfo() + { + if(_endpointInfo == null) + { + _endpointInfo = _endpoint.getInfo(); + } + return _endpointInfo; + } + + String + getParent() + { + return "Communicator"; + } + + String + getId() + { + if(_id == null) + { + _id = _endpoint.toString(); + } + return _id; + } + + String + getEndpoint() + { + return _endpoint.toString(); + } + + final private Ice.Endpoint _endpoint; + private String _id; + private Ice.EndpointInfo _endpointInfo; + }; + + public + CommunicatorObserverI(IceInternal.MetricsAdminI metrics) + { + _metrics = metrics; + + _connections = new ObserverFactory<ConnectionMetrics, ConnectionObserverI>(metrics, "Connection", + ConnectionMetrics.class); + _dispatch = new ObserverFactory<Metrics, ObserverI>(metrics, "Dispatch", Metrics.class); + _invocations = new ObserverFactory<InvocationMetrics, InvocationObserverI>(metrics, "Invocation", + InvocationMetrics.class); + _threads = new ObserverFactory<ThreadMetrics, ThreadObserverI>(metrics, "Thread", ThreadMetrics.class); + _connects = new ObserverFactory<Metrics, ObserverI>(metrics, "ConnectionEstablishment", Metrics.class); + _endpointLookups = new ObserverFactory<Metrics, ObserverI>(metrics, "EndpointLookup", Metrics.class); + + try + { + _invocations.registerSubMap("Remote", Metrics.class, InvocationMetrics.class.getDeclaredField("remotes")); + } + catch(Exception ex) + { + assert(false); + } + } + + public Ice.Instrumentation.Observer + getConnectionEstablishmentObserver(Ice.Endpoint endpt, String connector) + { + if(_connects.isEnabled()) + { + return _connects.getObserver(new EndpointHelper(endpt, connector), ObserverI.class); + } + return null; + } + + public Ice.Instrumentation.Observer + getEndpointLookupObserver(Ice.Endpoint endpt) + { + if(_endpointLookups.isEnabled()) + { + return _endpointLookups.getObserver(new EndpointHelper(endpt), ObserverI.class); + } + return null; + } + + public Ice.Instrumentation.ConnectionObserver + getConnectionObserver(Ice.ConnectionInfo c, Ice.Endpoint e, Ice.Instrumentation.ConnectionState s, + Ice.Instrumentation.ConnectionObserver o) + { + if(_connections.isEnabled()) + { + return _connections.getObserver(new ConnectionHelper(c, e, s), o, ConnectionObserverI.class); + } + return null; + } + + public Ice.Instrumentation.ThreadObserver + getThreadObserver(String parent, String id, Ice.Instrumentation.ThreadState s, Ice.Instrumentation.ThreadObserver o) + { + if(_threads.isEnabled()) + { + return _threads.getObserver(new ThreadHelper(parent, id, s), o, ThreadObserverI.class); + } + return null; + } + + public Ice.Instrumentation.InvocationObserver + getInvocationObserver(Ice.ObjectPrx prx, String operation, java.util.Map<java.lang.String, java.lang.String> ctx) + { + if(_invocations.isEnabled()) + { + return _invocations.getObserver(new InvocationHelper(prx, operation, ctx), InvocationObserverI.class); + } + return null; + } + + public Ice.Instrumentation.Observer + getDispatchObserver(Ice.Current c) + { + if(_dispatch.isEnabled()) + { + return _dispatch.getObserver(new DispatchHelper(c), ObserverI.class); + } + return null; + } + + public void + setObserverUpdater(final Ice.Instrumentation.ObserverUpdater updater) + { + _connections.setUpdater(new Runnable() { + public void + run() + { + updater.updateConnectionObservers(); + } + }); + _threads.setUpdater(new Runnable() { + public void + run() + { + updater.updateThreadObservers(); + } + }); + } + + final private IceInternal.MetricsAdminI _metrics; + final private ObserverFactory<ConnectionMetrics, ConnectionObserverI> _connections; + final private ObserverFactory<Metrics, ObserverI> _dispatch; + final private ObserverFactory<InvocationMetrics, InvocationObserverI> _invocations; + final private ObserverFactory<ThreadMetrics, ThreadObserverI> _threads; + final private ObserverFactory<Metrics, ObserverI> _connects; + final private ObserverFactory<Metrics, ObserverI> _endpointLookups; +} diff --git a/java/src/IceMX/ConnectionObserverI.java b/java/src/IceMX/ConnectionObserverI.java new file mode 100644 index 00000000000..6371c06c2ba --- /dev/null +++ b/java/src/IceMX/ConnectionObserverI.java @@ -0,0 +1,48 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +public class ConnectionObserverI extends Observer<ConnectionMetrics> implements Ice.Instrumentation.ConnectionObserver +{ + public void + sentBytes(final int num) + { + _sentBytes = num; + forEach(_sentBytesUpdate); + } + + public void + receivedBytes(int num) + { + _receivedBytes = num; + forEach(_receivedBytesUpdate); + } + + private MetricsUpdate<ConnectionMetrics> _sentBytesUpdate = new MetricsUpdate<ConnectionMetrics>() + { + public void + update(ConnectionMetrics v) + { + v.sentBytes += _sentBytes; + } + }; + + private MetricsUpdate<ConnectionMetrics> _receivedBytesUpdate = new MetricsUpdate<ConnectionMetrics>() + { + public void + update(ConnectionMetrics v) + { + v.receivedBytes += _receivedBytes; + } + }; + + private int _sentBytes; + private int _receivedBytes; +}
\ No newline at end of file diff --git a/java/src/IceMX/InvocationObserverI.java b/java/src/IceMX/InvocationObserverI.java new file mode 100644 index 00000000000..b581cb1723c --- /dev/null +++ b/java/src/IceMX/InvocationObserverI.java @@ -0,0 +1,117 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +public class InvocationObserverI extends Observer<InvocationMetrics> implements Ice.Instrumentation.InvocationObserver +{ + static private final class RemoteInvocationHelper extends MetricsHelper<Metrics> + { + static private final AttributeResolver _attributes = new AttributeResolver() + { + { + try + { + Class<?> cl = RemoteInvocationHelper.class; + add("parent", cl.getDeclaredMethod("getParent")); + add("id", cl.getDeclaredMethod("getId")); + add("endpoint", cl.getDeclaredMethod("getEndpoint")); + CommunicatorObserverI.addConnectionAttributes(this, RemoteInvocationHelper.class); + } + catch(Exception ex) + { + ex.printStackTrace(); + assert(false); + } + } + }; + + RemoteInvocationHelper(Ice.ConnectionInfo con, Ice.Endpoint endpt) + { + super(_attributes); + _connectionInfo = con; + _endpoint = endpt; + } + + String + getId() + { + if(_id == null) + { + _id = _endpoint.toString(); + if(_connectionInfo.connectionId != null && !_connectionInfo.connectionId.isEmpty()) + { + _id += " [" + _connectionInfo.connectionId + "]"; + } + } + return _id; + } + + String + getParent() + { + if(_connectionInfo.adapterName != null && !_connectionInfo.adapterName.isEmpty()) + { + return _connectionInfo.adapterName; + } + else + { + return "Communicator"; + } + } + + Ice.ConnectionInfo + getConnectionInfo() + { + return _connectionInfo; + } + + Ice.Endpoint + getEndpoint() + { + return _endpoint; + } + + Ice.EndpointInfo + getEndpointInfo() + { + if(_endpointInfo == null) + { + _endpointInfo = _endpoint.getInfo(); + } + return _endpointInfo; + } + + final private Ice.ConnectionInfo _connectionInfo; + final private Ice.Endpoint _endpoint; + private String _id; + private Ice.EndpointInfo _endpointInfo; + }; + + public void + retried() + { + forEach(_incrementRetry); + } + + public Observer + getRemoteObserver(Ice.ConnectionInfo con, Ice.Endpoint endpt) + { + return getObserver("Remote", new RemoteInvocationHelper(con, endpt), Metrics.class, ObserverI.class); + } + + final MetricsUpdate<InvocationMetrics> _incrementRetry = new MetricsUpdate<InvocationMetrics>() + { + public void + update(InvocationMetrics v) + { + ++v.retry; + } + }; +}
\ No newline at end of file diff --git a/java/src/IceMX/MetricsHelper.java b/java/src/IceMX/MetricsHelper.java new file mode 100644 index 00000000000..32f820ae4fb --- /dev/null +++ b/java/src/IceMX/MetricsHelper.java @@ -0,0 +1,166 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +public class MetricsHelper<T> +{ + static class AttributeResolver + { + private abstract class Resolver + { + Resolver(String name) + { + _name = name; + } + + abstract Object resolve(Object obj) throws Exception; + + String resolveImpl(Object obj) + { + try + { + Object result = resolve(obj); + if(result != null) + { + return result.toString(); + } + return ""; + } + catch(IllegalArgumentException ex) + { + throw ex; + } + catch(Exception ex) + { + ex.printStackTrace(); + assert(false); + return null; + } + } + + final String _name; + }; + + protected + AttributeResolver() + { + } + + public String + resolve(MetricsHelper helper, String attribute) + { + Resolver resolver = _attributes.get(attribute); + if(resolver == null) + { + if(attribute.equals("none")) + { + return ""; + } + String v = helper.defaultResolve(attribute); + if(v != null) + { + return v; + } + throw new IllegalArgumentException(attribute); + } + return resolver.resolveImpl(helper); + } + + public void + add(String name, final java.lang.reflect.Method method) + { + _attributes.put(name, new Resolver(name) + { + public Object + resolve(Object obj) throws Exception + { + return method.invoke(obj); + } + }); + } + + public void + add(String name, final java.lang.reflect.Field field) + { + _attributes.put(name, new Resolver(name) + { + public Object + resolve(Object obj) throws Exception + { + return field.get(obj); + } + }); + } + + public void + add(final String name, final java.lang.reflect.Method method, final java.lang.reflect.Field field) + { + _attributes.put(name, new Resolver(name) + { + public Object + resolve(Object obj) throws Exception + { + Object o = method.invoke(obj); + if(o != null) + { + return field.get(o); + } + throw new IllegalArgumentException(name); + } + }); + } + + public void + add(final String name, final java.lang.reflect.Method method, final java.lang.reflect.Method subMethod) + { + _attributes.put(name, new Resolver(name) + { + public Object + resolve(Object obj) throws Exception + { + Object o = method.invoke(obj); + if(o != null) + { + return subMethod.invoke(o); + } + throw new IllegalArgumentException(name); + } + }); + } + + private java.util.Map<String, Resolver> _attributes = new java.util.HashMap<String, Resolver>(); + }; + + protected + MetricsHelper(AttributeResolver attributes) + { + _attributes = attributes; + } + + public String + resolve(String attribute) + { + return _attributes.resolve(this, attribute); + } + + public void + initMetrics(T metrics) + { + // Override in specialized helpers. + } + + protected String + defaultResolve(String attribute) + { + return null; + } + + private AttributeResolver _attributes; +};
\ No newline at end of file diff --git a/java/src/IceMX/Observer.java b/java/src/IceMX/Observer.java new file mode 100644 index 00000000000..0cdc1074b12 --- /dev/null +++ b/java/src/IceMX/Observer.java @@ -0,0 +1,134 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +import IceInternal.MetricsMap; + +public class Observer<T extends Metrics> extends IceUtilInternal.StopWatch implements Ice.Instrumentation.Observer +{ + public interface MetricsUpdate<T> + { + void update(T m); + }; + + public void + attach() + { + start(); + } + + public void + detach() + { + long lifetime = stop(); + for(MetricsMap<T>.Entry e : _objects) + { + e.detach(lifetime); + } + } + + public void + failed(String exceptionName) + { + for(MetricsMap<T>.Entry e : _objects) + { + e.failed(exceptionName); + } + } + + public void + forEach(MetricsUpdate<T> u) + { + for(MetricsMap<T>.Entry e : _objects) + { + e.execute(u); + } + } + + public void + init(MetricsHelper<T> helper, java.util.List<MetricsMap<T>.Entry> objects) + { + assert(_objects == null); + _objects = objects; + java.util.Collections.sort(_objects); + for(MetricsMap<T>.Entry e : _objects) + { + e.attach(helper); + } + } + + public void + update(MetricsHelper<T> helper, java.util.List<MetricsMap<T>.Entry> objects) + { + java.util.Collections.sort(objects); + java.util.ListIterator<MetricsMap<T>.Entry> p = objects.listIterator(); + java.util.ListIterator<MetricsMap<T>.Entry> q = _objects.listIterator(); + while(p.hasNext()) + { + MetricsMap<T>.Entry pe = p.next(); + MetricsMap<T>.Entry qe = q.hasNext() ? q.next() : null; + if(qe == null || pe.compareTo(qe) < 0) // New metrics object + { + q.add(pe); + q.previous(); + pe.attach(helper); + } + else if(pe == qe) // Same metrics object + { + // Nothing to do. + } + else // Removed metrics object + { + qe.detach(delay()); + q.remove(); + p.previous(); + } + } + while(q.hasNext()) + { + MetricsMap<T>.Entry qe = q.next(); + q.remove(); + qe.detach(delay()); + } + } + + public <S extends Metrics, ObserverImpl extends Observer<S>> ObserverImpl + getObserver(String mapName, MetricsHelper<S> helper, Class<S> mcl, Class<ObserverImpl> ocl) + { + java.util.List<MetricsMap<S>.Entry> metricsObjects = new java.util.LinkedList<MetricsMap<S>.Entry>(); + for(MetricsMap<T>.Entry entry : _objects) + { + MetricsMap<S>.Entry e = entry.getMatching(mapName, helper, mcl); + if(e != null) + { + metricsObjects.add(e); + } + } + + if(metricsObjects.isEmpty()) + { + return null; + } + + try + { + ObserverImpl obsv = ocl.newInstance(); + obsv.init(helper, metricsObjects); + return obsv; + } + catch(Exception ex) + { + assert(false); + return null; + } + } + + private java.util.List<MetricsMap<T>.Entry> _objects; +}; diff --git a/java/src/IceMX/ObserverFactory.java b/java/src/IceMX/ObserverFactory.java new file mode 100644 index 00000000000..c1bf521985d --- /dev/null +++ b/java/src/IceMX/ObserverFactory.java @@ -0,0 +1,142 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +import IceInternal.MetricsMap; + +public class ObserverFactory<T extends Metrics, O extends Observer<T>> +{ + public + ObserverFactory(IceInternal.MetricsAdminI metrics, String name, Class<T> cl) + { + _metrics = metrics; + _name = name; + _class = cl; + _metrics.registerMap(name, _class, new Runnable() + { + public void + run() + { + update(); + } + }); + } + + public + ObserverFactory(String name, Class<T> cl) + { + _name = name; + _metrics = null; + _class = cl; + } + + public void + destroy() + { + if(_metrics != null) + { + _metrics.unregisterMap(_name); + } + } + + synchronized O + getObserver(MetricsHelper<T> helper, Class<O> cl) + { + return getObserver(helper, null, cl); + } + + @SuppressWarnings("unchecked") + synchronized O + getObserver(MetricsHelper<T> helper, Object observer, Class<O> cl) + { + + java.util.List<MetricsMap<T>.Entry> metricsObjects = new java.util.LinkedList<MetricsMap<T>.Entry>(); + for(MetricsMap<T> m : _maps) + { + MetricsMap<T>.Entry e = m.getMatching(helper); + if(e != null) + { + metricsObjects.add(e); + } + } + + if(metricsObjects.isEmpty()) + { + return null; + } + + O obsv; + if(observer == null) + { + try + { + obsv = cl.newInstance(); + } + catch(Exception ex) + { + assert(false); + return null; + } + obsv.init(helper, metricsObjects); + } + else + { + obsv = (O)observer; + obsv.update(helper, metricsObjects); + } + return obsv; + } + + public <S extends IceMX.Metrics> void + registerSubMap(String subMap, Class<S> cl, java.lang.reflect.Field field) + { + _metrics.registerSubMap(_name, subMap, cl, field); + } + + public boolean + isEnabled() + { + return _enabled; + } + + public void + update() + { + Runnable updater; + synchronized(this) + { + _maps.clear(); + for(MetricsMap<T> m : _metrics.getMaps(_name, _class)) + { + _maps.add(m); + } + _enabled = !_maps.isEmpty(); + updater = _updater; + } + + if(updater != null) + { + updater.run(); + } + } + + synchronized void + setUpdater(Runnable updater) + { + _updater = updater; + } + + private final IceInternal.MetricsAdminI _metrics; + private final String _name; + private final Class<T> _class; + private java.util.List<MetricsMap<T>> _maps = new java.util.ArrayList<MetricsMap<T>>(); + private volatile boolean _enabled; + private Runnable _updater; +}
\ No newline at end of file diff --git a/java/src/IceMX/ObserverI.java b/java/src/IceMX/ObserverI.java new file mode 100644 index 00000000000..c9bb9c8b7cf --- /dev/null +++ b/java/src/IceMX/ObserverI.java @@ -0,0 +1,14 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +public class ObserverI extends Observer<Metrics> +{ +}; diff --git a/java/src/IceMX/ThreadObserverI.java b/java/src/IceMX/ThreadObserverI.java new file mode 100644 index 00000000000..79209c879e2 --- /dev/null +++ b/java/src/IceMX/ThreadObserverI.java @@ -0,0 +1,60 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +public class ThreadObserverI extends Observer<ThreadMetrics> implements Ice.Instrumentation.ThreadObserver +{ + public void + stateChanged(final Ice.Instrumentation.ThreadState oldState, final Ice.Instrumentation.ThreadState newState) + { + _oldState = oldState; + _newState = newState; + forEach(_threadStateUpdate); + } + + private MetricsUpdate<ThreadMetrics> _threadStateUpdate = new MetricsUpdate<ThreadMetrics>() + { + public void + update(ThreadMetrics v) + { + switch(_oldState) + { + case ThreadStateInUseForIO: + --v.inUseForIO; + break; + case ThreadStateInUseForUser: + --v.inUseForUser; + break; + case ThreadStateInUseForOther: + --v.inUseForOther; + break; + default: + break; + } + switch(_newState) + { + case ThreadStateInUseForIO: + ++v.inUseForIO; + break; + case ThreadStateInUseForUser: + ++v.inUseForUser; + break; + case ThreadStateInUseForOther: + ++v.inUseForOther; + break; + default: + break; + } + } + }; + + private Ice.Instrumentation.ThreadState _oldState; + private Ice.Instrumentation.ThreadState _newState; +}
\ No newline at end of file diff --git a/java/src/IceSSL/EndpointI.java b/java/src/IceSSL/EndpointI.java index e768b26d40e..0113fef4a6e 100644 --- a/java/src/IceSSL/EndpointI.java +++ b/java/src/IceSSL/EndpointI.java @@ -15,12 +15,11 @@ final class EndpointI extends IceInternal.EndpointI EndpointI(Instance instance, String ho, int po, int ti, Ice.ProtocolVersion pv, Ice.EncodingVersion ev, String conId, boolean co) { - super(pv, ev); + super(pv, ev, conId); _instance = instance; _host = ho; _port = po; _timeout = ti; - _connectionId = conId; _compress = co; calcHashValue(); } @@ -28,7 +27,7 @@ final class EndpointI extends IceInternal.EndpointI public EndpointI(Instance instance, String str, boolean oaEndpoint) { - super(IceInternal.Protocol.currentProtocol, instance.defaultEncoding()); + super(IceInternal.Protocol.currentProtocol, instance.defaultEncoding(), ""); _instance = instance; _host = null; _port = 0; @@ -171,7 +170,7 @@ final class EndpointI extends IceInternal.EndpointI public EndpointI(Instance instance, IceInternal.BasicStream s) { - super(new Ice.ProtocolVersion(), new Ice.EncodingVersion()); + super(new Ice.ProtocolVersion(), new Ice.EncodingVersion(), ""); _instance = instance; s.startReadEncaps(); _host = s.readString(); @@ -514,11 +513,6 @@ final class EndpointI extends IceInternal.EndpointI return 1; } - if(!_connectionId.equals(p._connectionId)) - { - return _connectionId.compareTo(p._connectionId); - } - if(_timeout < p._timeout) { return -1; @@ -570,7 +564,6 @@ final class EndpointI extends IceInternal.EndpointI private String _host; private int _port; private int _timeout; - private String _connectionId = ""; private boolean _compress; private int _hashCode; } diff --git a/java/src/IceSSL/TransceiverI.java b/java/src/IceSSL/TransceiverI.java index 8b67ba18108..9b8d706370b 100644 --- a/java/src/IceSSL/TransceiverI.java +++ b/java/src/IceSSL/TransceiverI.java @@ -370,56 +370,48 @@ final class TransceiverI implements IceInternal.Transceiver // // This can only be called on an open transceiver. // - assert(_fd != null); - NativeConnectionInfo info = new NativeConnectionInfo(); - java.net.Socket socket = _fd.socket(); - if(socket.getLocalAddress() != null) - { - info.localAddress = socket.getLocalAddress().getHostAddress(); - info.localPort = socket.getLocalPort(); - } - else + if(_fd != null) { + java.net.Socket socket = _fd.socket(); // // On some platforms (e.g., early Android releases), sockets don't // correctly return address information. // - info.localAddress = ""; - info.localPort = -1; - } + if(socket.getLocalAddress() != null) + { + info.localAddress = socket.getLocalAddress().getHostAddress(); + info.localPort = socket.getLocalPort(); + } + + if(socket.getInetAddress() != null) + { + info.remoteAddress = socket.getInetAddress().getHostAddress(); + info.remotePort = socket.getPort(); + } - if(socket.getInetAddress() != null) - { - info.remoteAddress = socket.getInetAddress().getHostAddress(); - info.remotePort = socket.getPort(); - } - else - { - info.remoteAddress = ""; - info.remotePort = -1; - } - SSLSession session = _engine.getSession(); - info.cipher = session.getCipherSuite(); - try - { - java.util.ArrayList<String> certs = new java.util.ArrayList<String>(); - info.nativeCerts = session.getPeerCertificates(); - for(java.security.cert.Certificate c : info.nativeCerts) + SSLSession session = _engine.getSession(); + info.cipher = session.getCipherSuite(); + try { - StringBuffer s = new StringBuffer("-----BEGIN CERTIFICATE-----\n"); - s.append(IceUtilInternal.Base64.encode(c.getEncoded())); - s.append("\n-----END CERTIFICATE-----"); - certs.add(s.toString()); + java.util.ArrayList<String> certs = new java.util.ArrayList<String>(); + info.nativeCerts = session.getPeerCertificates(); + for(java.security.cert.Certificate c : info.nativeCerts) + { + StringBuffer s = new StringBuffer("-----BEGIN CERTIFICATE-----\n"); + s.append(IceUtilInternal.Base64.encode(c.getEncoded())); + s.append("\n-----END CERTIFICATE-----"); + certs.add(s.toString()); + } + info.certs = certs.toArray(new String[0]); + } + catch(java.security.cert.CertificateEncodingException ex) + { + } + catch(javax.net.ssl.SSLPeerUnverifiedException ex) + { + // No peer certificates. } - info.certs = certs.toArray(new String[0]); - } - catch(java.security.cert.CertificateEncodingException ex) - { - } - catch(javax.net.ssl.SSLPeerUnverifiedException ex) - { - // No peer certificates. } info.adapterName = _adapterName; info.incoming = _incoming; diff --git a/java/src/IceUtilInternal/StopWatch.java b/java/src/IceUtilInternal/StopWatch.java new file mode 100644 index 00000000000..de531a50330 --- /dev/null +++ b/java/src/IceUtilInternal/StopWatch.java @@ -0,0 +1,42 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceUtilInternal; + +public class StopWatch +{ + public void + start() + { + _s = System.nanoTime(); + } + + public long + stop() + { + assert(isStarted()); + long d = (System.nanoTime() - _s) / 1000; + _s = 0; + return d; + } + + public boolean + isStarted() + { + return _s != 0; + } + + public long + delay() + { + return (System.nanoTime() - _s) / 1000; + } + + private long _s = 0; +}; diff --git a/java/src/IceUtilInternal/StringUtil.java b/java/src/IceUtilInternal/StringUtil.java index 5201c4fd199..c990818541b 100644 --- a/java/src/IceUtilInternal/StringUtil.java +++ b/java/src/IceUtilInternal/StringUtil.java @@ -483,4 +483,54 @@ public final class StringUtil } return 0; // Not quoted } + + public static boolean + match(String s, String pat, boolean emptyMatch) + { + assert(s.length() > 0); + assert(pat.length() > 0); + + // + // If pattern does not contain a wildcard just compare strings. + // + int beginIndex = pat.indexOf('*'); + if(beginIndex < 0) + { + return s.equals(pat); + } + + // + // Make sure start of the strings match + // + if(beginIndex > s.length() || !s.substring(0, beginIndex).equals(pat.substring(0, beginIndex))) + { + return false; + } + + // + // Make sure there is something present in the middle to match the + // wildcard. If emptyMatch is true, allow a match of "". + // + int endLength = pat.length() - beginIndex - 1; + if(endLength > s.length()) + { + return false; + } + int endIndex = s.length() - endLength; + if(endIndex < beginIndex || (!emptyMatch && endIndex == beginIndex)) + { + return false; + } + + // + // Make sure end of the strings match + // + if(!s.substring(endIndex, s.length() - endIndex).equals( + pat.substring(beginIndex + 1, pat.length() - beginIndex - 1))) + { + return false; + } + + return true; + } } |