diff options
author | Benoit Foucher <benoit@zeroc.com> | 2013-09-09 19:12:28 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2013-09-09 19:12:28 +0200 |
commit | 8a0d1c7e34d8bd18bd85666cce94403a5158975c (patch) | |
tree | 36050a818282c0a92ee88a6ef28354e186c5aebe /java | |
parent | Test scripts improvements (diff) | |
download | ice-8a0d1c7e34d8bd18bd85666cce94403a5158975c.tar.bz2 ice-8a0d1c7e34d8bd18bd85666cce94403a5158975c.tar.xz ice-8a0d1c7e34d8bd18bd85666cce94403a5158975c.zip |
Fixed ICE-5196: allow setting an observer with IceMX enabled
Diffstat (limited to 'java')
27 files changed, 711 insertions, 75 deletions
diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java index 82fd56d46a6..21b6525a05a 100644 --- a/java/src/Ice/CommunicatorI.java +++ b/java/src/Ice/CommunicatorI.java @@ -140,9 +140,10 @@ public final class CommunicatorI implements Communicator return _instance.initializationData().stats; } - public Ice.Instrumentation.CommunicatorObserver getObserver() + public Ice.Instrumentation.CommunicatorObserver + getObserver() { - return _instance.initializationData().observer; + return _instance.getObserver(); } public RouterPrx diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 32bf6308d05..bdd49e86ab7 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -257,11 +257,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return; } - assert(_instance.initializationData().observer != null); - _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), - _endpoint, - toConnectionState(_state), - _observer); + assert(_instance.getObserver() != null); + _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), + _endpoint, + toConnectionState(_state), + _observer); if(_observer != null) { _observer.attach(); @@ -1831,16 +1831,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - if(_instance.initializationData().observer != null) + if(_instance.getObserver() != null) { Ice.Instrumentation.ConnectionState oldState = toConnectionState(_state); Ice.Instrumentation.ConnectionState newState = toConnectionState(state); if(oldState != newState) { - _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), - _endpoint, - newState, - _observer); + _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), + _endpoint, + newState, + _observer); if(_observer != null) { _observer.attach(); diff --git a/java/src/IceInternal/CommunicatorObserverI.java b/java/src/IceInternal/CommunicatorObserverI.java index 056ef2cd007..78c44da0905 100644 --- a/java/src/IceInternal/CommunicatorObserverI.java +++ b/java/src/IceInternal/CommunicatorObserverI.java @@ -583,16 +583,27 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb public CommunicatorObserverI(IceInternal.MetricsAdminI metrics) { - _metrics = metrics; + this(metrics, null); + } - _connections = new ObserverFactory<ConnectionMetrics, ConnectionObserverI>(metrics, "Connection", - ConnectionMetrics.class); - _dispatch = new ObserverFactory<DispatchMetrics, DispatchObserverI>(metrics, "Dispatch", DispatchMetrics.class); - _invocations = new ObserverFactory<InvocationMetrics, InvocationObserverI>(metrics, "Invocation", - InvocationMetrics.class); - _threads = new ObserverFactory<ThreadMetrics, ThreadObserverI>(metrics, "Thread", ThreadMetrics.class); - _connects = new ObserverFactory<Metrics, ObserverI>(metrics, "ConnectionEstablishment", Metrics.class); - _endpointLookups = new ObserverFactory<Metrics, ObserverI>(metrics, "EndpointLookup", Metrics.class); + public + CommunicatorObserverI(IceInternal.MetricsAdminI metrics, Ice.Instrumentation.CommunicatorObserver delegate) + { + _metrics = metrics; + _delegate = delegate; + + _connections = new ObserverFactoryWithDelegate<ConnectionMetrics, ConnectionObserverI, + Ice.Instrumentation.ConnectionObserver>(metrics, "Connection", ConnectionMetrics.class); + _dispatch = new ObserverFactoryWithDelegate<DispatchMetrics, DispatchObserverI, + Ice.Instrumentation.DispatchObserver>(metrics, "Dispatch", DispatchMetrics.class); + _invocations = new ObserverFactoryWithDelegate<InvocationMetrics, InvocationObserverI, + Ice.Instrumentation.InvocationObserver>(metrics, "Invocation", InvocationMetrics.class); + _threads = new ObserverFactoryWithDelegate<ThreadMetrics, ThreadObserverI, + Ice.Instrumentation.ThreadObserver>(metrics, "Thread", ThreadMetrics.class); + _connects = new ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer>(metrics, "ConnectionEstablishment", Metrics.class); + _endpointLookups = new ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer>(metrics, "EndpointLookup", Metrics.class); try { @@ -612,11 +623,17 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb { try { - return _connects.getObserver(new EndpointHelper(endpt, connector), ObserverI.class); + Ice.Instrumentation.Observer delegate = null; + if(_delegate != null) + { + delegate = _delegate.getConnectionEstablishmentObserver(endpt, connector); + } + return _connects.getObserver(new EndpointHelper(endpt, connector), ObserverWithDelegateI.class, + delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } return null; @@ -629,11 +646,16 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb { try { - return _endpointLookups.getObserver(new EndpointHelper(endpt), ObserverI.class); + Ice.Instrumentation.Observer delegate = null; + if(_delegate != null) + { + delegate = _delegate.getEndpointLookupObserver(endpt); + } + return _endpointLookups.getObserver(new EndpointHelper(endpt), ObserverWithDelegateI.class, delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } @@ -642,34 +664,47 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb public Ice.Instrumentation.ConnectionObserver getConnectionObserver(Ice.ConnectionInfo c, Ice.Endpoint e, Ice.Instrumentation.ConnectionState s, - Ice.Instrumentation.ConnectionObserver o) + Ice.Instrumentation.ConnectionObserver observer) { if(_connections.isEnabled()) { try { - return _connections.getObserver(new ConnectionHelper(c, e, s), o, ConnectionObserverI.class); + Ice.Instrumentation.ConnectionObserver delegate = null; + ConnectionObserverI o = observer instanceof ConnectionObserverI ? (ConnectionObserverI)observer : null; + if(_delegate != null) + { + delegate = _delegate.getConnectionObserver(c, e, s, o != null ? o.getDelegate() : observer); + } + return _connections.getObserver(new ConnectionHelper(c, e, s), o, ConnectionObserverI.class, delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } return null; } public Ice.Instrumentation.ThreadObserver - getThreadObserver(String parent, String id, Ice.Instrumentation.ThreadState s, Ice.Instrumentation.ThreadObserver o) + getThreadObserver(String parent, String id, Ice.Instrumentation.ThreadState s, + Ice.Instrumentation.ThreadObserver observer) { if(_threads.isEnabled()) { try { - return _threads.getObserver(new ThreadHelper(parent, id, s), o, ThreadObserverI.class); + Ice.Instrumentation.ThreadObserver delegate = null; + ThreadObserverI o = observer instanceof ThreadObserverI ? (ThreadObserverI)observer : null; + if(_delegate != null) + { + delegate = _delegate.getThreadObserver(parent, id, s, o != null ? o.getDelegate() : observer); + } + return _threads.getObserver(new ThreadHelper(parent, id, s), o, ThreadObserverI.class, delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } return null; @@ -682,11 +717,18 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb { try { - return _invocations.getObserver(new InvocationHelper(prx, operation, ctx), InvocationObserverI.class); + Ice.Instrumentation.InvocationObserver delegate = null; + if(_delegate != null) + { + delegate = _delegate.getInvocationObserver(prx, operation, ctx); + } + return _invocations.getObserver(new InvocationHelper(prx, operation, ctx), + InvocationObserverI.class, + delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } return null; @@ -699,11 +741,16 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb { try { - return _dispatch.getObserver(new DispatchHelper(c, size), DispatchObserverI.class); + Ice.Instrumentation.DispatchObserver delegate = null; + if(_delegate != null) + { + delegate = _delegate.getDispatchObserver(c, size); + } + return _dispatch.getObserver(new DispatchHelper(c, size), DispatchObserverI.class, delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } return null; @@ -726,6 +773,11 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb updater.updateThreadObservers(); } }); + + if(_delegate != null) + { + _delegate.setObserverUpdater(updater); + } } public IceInternal.MetricsAdminI getMetricsAdmin() @@ -734,10 +786,17 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb } final private IceInternal.MetricsAdminI _metrics; - final private ObserverFactory<ConnectionMetrics, ConnectionObserverI> _connections; - final private ObserverFactory<DispatchMetrics, DispatchObserverI> _dispatch; - final private ObserverFactory<InvocationMetrics, InvocationObserverI> _invocations; - final private ObserverFactory<ThreadMetrics, ThreadObserverI> _threads; - final private ObserverFactory<Metrics, ObserverI> _connects; - final private ObserverFactory<Metrics, ObserverI> _endpointLookups; + final private Ice.Instrumentation.CommunicatorObserver _delegate; + final private ObserverFactoryWithDelegate<ConnectionMetrics, ConnectionObserverI, + Ice.Instrumentation.ConnectionObserver> _connections; + final private ObserverFactoryWithDelegate<DispatchMetrics, DispatchObserverI, + Ice.Instrumentation.DispatchObserver> _dispatch; + final private ObserverFactoryWithDelegate<InvocationMetrics, InvocationObserverI, + Ice.Instrumentation.InvocationObserver> _invocations; + final private ObserverFactoryWithDelegate<ThreadMetrics, ThreadObserverI, + Ice.Instrumentation.ThreadObserver> _threads; + final private ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer> _connects; + final private ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer> _endpointLookups; } diff --git a/java/src/IceInternal/ConnectionObserverI.java b/java/src/IceInternal/ConnectionObserverI.java index 847b0216ec8..9aa68be1280 100644 --- a/java/src/IceInternal/ConnectionObserverI.java +++ b/java/src/IceInternal/ConnectionObserverI.java @@ -9,7 +9,8 @@ package IceInternal; -public class ConnectionObserverI extends IceMX.Observer<IceMX.ConnectionMetrics> +public class ConnectionObserverI + extends IceMX.ObserverWithDelegate<IceMX.ConnectionMetrics, Ice.Instrumentation.ConnectionObserver> implements Ice.Instrumentation.ConnectionObserver { public void @@ -17,6 +18,10 @@ public class ConnectionObserverI extends IceMX.Observer<IceMX.ConnectionMetrics> { _sentBytes = num; forEach(_sentBytesUpdate); + if(_delegate != null) + { + _delegate.sentBytes(num); + } } public void @@ -24,6 +29,10 @@ public class ConnectionObserverI extends IceMX.Observer<IceMX.ConnectionMetrics> { _receivedBytes = num; forEach(_receivedBytesUpdate); + if(_delegate != null) + { + _delegate.receivedBytes(num); + } } private MetricsUpdate<IceMX.ConnectionMetrics> _sentBytesUpdate = new MetricsUpdate<IceMX.ConnectionMetrics>() diff --git a/java/src/IceInternal/DispatchObserverI.java b/java/src/IceInternal/DispatchObserverI.java index 97dadc19d0d..c5366c4ac69 100644 --- a/java/src/IceInternal/DispatchObserverI.java +++ b/java/src/IceInternal/DispatchObserverI.java @@ -9,13 +9,18 @@ package IceInternal; -public class DispatchObserverI extends IceMX.Observer<IceMX.DispatchMetrics> +public class DispatchObserverI + extends IceMX.ObserverWithDelegate<IceMX.DispatchMetrics, Ice.Instrumentation.DispatchObserver> implements Ice.Instrumentation.DispatchObserver { public void userException() { forEach(_userException); + if(_delegate != null) + { + _delegate.userException(); + } } public void @@ -29,6 +34,10 @@ public class DispatchObserverI extends IceMX.Observer<IceMX.DispatchMetrics> v.replySize += size; } }); + if(_delegate != null) + { + _delegate.reply(size); + } } final MetricsUpdate<IceMX.DispatchMetrics> _userException = new MetricsUpdate<IceMX.DispatchMetrics>() diff --git a/java/src/IceInternal/EndpointHostResolver.java b/java/src/IceInternal/EndpointHostResolver.java index 25f144c66a5..57ed431299d 100644 --- a/java/src/IceInternal/EndpointHostResolver.java +++ b/java/src/IceInternal/EndpointHostResolver.java @@ -52,7 +52,7 @@ public class EndpointHostResolver } } - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); Ice.Instrumentation.Observer observer = null; if(obsv != null) { @@ -110,7 +110,7 @@ public class EndpointHostResolver entry.endpoint = endpoint; entry.callback = callback; - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { entry.observer = obsv.getEndpointLookupObserver(endpoint); @@ -240,7 +240,7 @@ public class EndpointHostResolver synchronized public void updateObserver() { - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { _observer = obsv.getThreadObserver("Communicator", diff --git a/java/src/IceInternal/Incoming.java b/java/src/IceInternal/Incoming.java index cccdfc43a33..5b76d223f8c 100644 --- a/java/src/IceInternal/Incoming.java +++ b/java/src/IceInternal/Incoming.java @@ -123,7 +123,7 @@ final public class Incoming extends IncomingBase implements Ice.Request _current.ctx.put(first, second); } - CommunicatorObserver obsv = _instance.initializationData().observer; + CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { // Read the parameter encapsulation size. diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 03fbc58c434..a837018cd6a 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -531,6 +531,13 @@ public final class Instance return result; } + public Ice.Instrumentation.CommunicatorObserver + getObserver() + { + return _observer; // Immutable + } + + public synchronized void setDefaultLocator(Ice.LocatorPrx locator) { @@ -817,18 +824,20 @@ public final class Instance // Setup the communicator observer only if the user didn't already set an // Ice observer resolver and if the admininistrative endpoints are set. // - if(_initData.observer == null && - (_adminFacetFilter.isEmpty() || _adminFacetFilter.contains("Metrics")) && + if((_adminFacetFilter.isEmpty() || _adminFacetFilter.contains("Metrics")) && _initData.properties.getProperty("Ice.Admin.Endpoints").length() > 0) { - CommunicatorObserverI observer = new CommunicatorObserverI(admin); - _initData.observer = observer; + _observer = new CommunicatorObserverI(admin, _initData.observer); // // Make sure the admin plugin receives property updates. // props.addUpdateCallback(admin); } + else + { + _observer = _initData.observer; + } } catch(Ice.LocalException ex) { @@ -882,9 +891,9 @@ public final class Instance // // Set observer updater // - if(_initData.observer != null) + if(_observer != null) { - _initData.observer.setObserverUpdater(new ObserverUpdaterI(this)); + _observer.setObserverUpdater(new ObserverUpdaterI(this)); } // @@ -1235,6 +1244,7 @@ public final class Instance private final int _clientACM; // Immutable, not reset by destroy(). private final int _serverACM; // Immutable, not reset by destroy(). private final Ice.ImplicitContextI _implicitContext; + private final Ice.Instrumentation.CommunicatorObserver _observer; private RouterManager _routerManager; private LocatorManager _locatorManager; private ReferenceFactory _referenceFactory; diff --git a/java/src/IceInternal/InvocationObserverI.java b/java/src/IceInternal/InvocationObserverI.java index c36b8a581c3..dd98db9df23 100644 --- a/java/src/IceInternal/InvocationObserverI.java +++ b/java/src/IceInternal/InvocationObserverI.java @@ -11,7 +11,8 @@ package IceInternal; import IceMX.*; -public class InvocationObserverI extends IceMX.Observer<IceMX.InvocationMetrics> +public class InvocationObserverI + extends IceMX.ObserverWithDelegate<IceMX.InvocationMetrics, Ice.Instrumentation.InvocationObserver> implements Ice.Instrumentation.InvocationObserver { static public final class RemoteInvocationHelper extends MetricsHelper<RemoteMetrics> @@ -117,21 +118,35 @@ public class InvocationObserverI extends IceMX.Observer<IceMX.InvocationMetrics> userException() { forEach(_userException); + if(_delegate != null) + { + _delegate.userException(); + } } public void retried() { forEach(_incrementRetry); + if(_delegate != null) + { + _delegate.retried(); + } } public Ice.Instrumentation.RemoteObserver getRemoteObserver(Ice.ConnectionInfo con, Ice.Endpoint edpt, int requestId, int sz) { + Ice.Instrumentation.RemoteObserver delegate = null; + if(_delegate != null) + { + delegate = _delegate.getRemoteObserver(con, edpt, requestId, sz); + } return (Ice.Instrumentation.RemoteObserver)getObserver("Remote", new RemoteInvocationHelper(con, edpt, requestId, sz), RemoteMetrics.class, - RemoteObserverI.class); + RemoteObserverI.class, + delegate); } final MetricsUpdate<InvocationMetrics> _incrementRetry = new MetricsUpdate<InvocationMetrics>() diff --git a/java/src/IceInternal/ObserverHelper.java b/java/src/IceInternal/ObserverHelper.java index d2b2be71177..6e1514baa8f 100644 --- a/java/src/IceInternal/ObserverHelper.java +++ b/java/src/IceInternal/ObserverHelper.java @@ -17,7 +17,7 @@ public final class ObserverHelper static public InvocationObserver get(Instance instance, String op) { - CommunicatorObserver obsv = instance.initializationData().observer; + CommunicatorObserver obsv = instance.getObserver(); if(obsv != null) { InvocationObserver observer = obsv.getInvocationObserver(null, op, _emptyContext); @@ -39,8 +39,7 @@ public final class ObserverHelper static public InvocationObserver get(Ice.ObjectPrx proxy, String op, java.util.Map<String, String> context) { - CommunicatorObserver obsv = - ((Ice.ObjectPrxHelperBase)proxy).__reference().getInstance().initializationData().observer; + CommunicatorObserver obsv = ((Ice.ObjectPrxHelperBase)proxy).__reference().getInstance().getObserver(); if(obsv != null) { InvocationObserver observer; diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 82e7c1adfe8..3bf55802f61 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -217,7 +217,7 @@ public final class OutgoingConnectionFactory // Try to establish the connection to the connectors. // DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); java.util.Iterator<ConnectorInfo> q = connectors.iterator(); ConnectorInfo ci = null; while(q.hasNext()) @@ -1212,7 +1212,7 @@ public final class OutgoingConnectionFactory assert(_iter.hasNext()); _current = _iter.next(); - Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.getObserver(); if(obsv != null) { _observer = obsv.getConnectionEstablishmentObserver(_current.endpoint, diff --git a/java/src/IceInternal/RemoteObserverI.java b/java/src/IceInternal/RemoteObserverI.java index 4ebc62e51d2..eb5b470a765 100644 --- a/java/src/IceInternal/RemoteObserverI.java +++ b/java/src/IceInternal/RemoteObserverI.java @@ -9,7 +9,8 @@ package IceInternal; -public class RemoteObserverI extends IceMX.Observer<IceMX.RemoteMetrics> +public class RemoteObserverI + extends IceMX.ObserverWithDelegate<IceMX.RemoteMetrics, Ice.Instrumentation.RemoteObserver> implements Ice.Instrumentation.RemoteObserver { public void @@ -23,5 +24,9 @@ public class RemoteObserverI extends IceMX.Observer<IceMX.RemoteMetrics> v.replySize += size; } }); + if(_delegate != null) + { + _delegate.reply(size); + } } }
\ No newline at end of file diff --git a/java/src/IceInternal/ThreadObserverI.java b/java/src/IceInternal/ThreadObserverI.java index 7086618b4f7..f92784b12c1 100644 --- a/java/src/IceInternal/ThreadObserverI.java +++ b/java/src/IceInternal/ThreadObserverI.java @@ -9,7 +9,9 @@ package IceInternal; -public class ThreadObserverI extends IceMX.Observer<IceMX.ThreadMetrics> implements Ice.Instrumentation.ThreadObserver +public class ThreadObserverI + extends IceMX.ObserverWithDelegate<IceMX.ThreadMetrics, Ice.Instrumentation.ThreadObserver> + implements Ice.Instrumentation.ThreadObserver { public void stateChanged(final Ice.Instrumentation.ThreadState oldState, final Ice.Instrumentation.ThreadState newState) @@ -17,6 +19,10 @@ public class ThreadObserverI extends IceMX.Observer<IceMX.ThreadMetrics> impleme _oldState = oldState; _newState = newState; forEach(_threadStateUpdate); + if(_delegate != null) + { + _delegate.stateChanged(oldState, newState); + } } private MetricsUpdate<IceMX.ThreadMetrics> _threadStateUpdate = new MetricsUpdate<IceMX.ThreadMetrics>() diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 19783697d1d..b7bc1b83c50 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -622,7 +622,7 @@ public final class ThreadPool updateObserver() { // Must be called with the thread pool mutex locked - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { _observer = obsv.getThreadObserver(_prefix, _name, _state, _observer); diff --git a/java/src/IceMX/ObserverFactory.java b/java/src/IceMX/ObserverFactory.java index 9a99e7a86e1..3881877c427 100644 --- a/java/src/IceMX/ObserverFactory.java +++ b/java/src/IceMX/ObserverFactory.java @@ -29,14 +29,6 @@ public class ObserverFactory<T extends Metrics, O extends Observer<T>> }); } - public - ObserverFactory(String name, Class<T> cl) - { - _name = name; - _metrics = null; - _class = cl; - } - public void destroy() { @@ -56,7 +48,14 @@ public class ObserverFactory<T extends Metrics, O extends Observer<T>> public synchronized O getObserver(MetricsHelper<T> helper, Object observer, Class<O> cl) { - O old = (O)observer; + O old = null; + try + { + old = (O)observer; + } + catch(ClassCastException ex) + { + } java.util.List<MetricsMap<T>.Entry> metricsObjects = null; for(MetricsMap<T> m : _maps) { diff --git a/java/src/IceMX/ObserverFactoryWithDelegate.java b/java/src/IceMX/ObserverFactoryWithDelegate.java new file mode 100644 index 00000000000..e900d09df40 --- /dev/null +++ b/java/src/IceMX/ObserverFactoryWithDelegate.java @@ -0,0 +1,48 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +public class ObserverFactoryWithDelegate<T extends Metrics, + OImpl extends ObserverWithDelegate<T, O>, + O extends Ice.Instrumentation.Observer> + extends ObserverFactory<T, OImpl> +{ + public + ObserverFactoryWithDelegate(IceInternal.MetricsAdminI metrics, String name, Class<T> cl) + { + super(metrics, name, cl); + } + + @SuppressWarnings("unchecked") + public O + getObserver(MetricsHelper<T> helper, Class<OImpl> cl, O delegate) + { + OImpl o = super.getObserver(helper, cl); + if(o != null) + { + o.setDelegate(delegate); + return (O)o; + } + return delegate; + } + + @SuppressWarnings("unchecked") + public O + getObserver(MetricsHelper<T> helper, Object observer, Class<OImpl> cl, O delegate) + { + OImpl o = super.getObserver(helper, observer, cl); + if(o != null) + { + o.setDelegate(delegate); + return (O)o; + } + return delegate; + } +}; diff --git a/java/src/IceMX/ObserverWithDelegate.java b/java/src/IceMX/ObserverWithDelegate.java new file mode 100644 index 00000000000..a699533e282 --- /dev/null +++ b/java/src/IceMX/ObserverWithDelegate.java @@ -0,0 +1,71 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +public class ObserverWithDelegate<T extends Metrics, O extends Ice.Instrumentation.Observer> extends Observer<T> +{ + public void + attach() + { + super.attach(); + if(_delegate != null) + { + _delegate.attach(); + } + } + + public void + detach() + { + super.detach(); + if(_delegate != null) + { + _delegate.detach(); + } + } + + public void + failed(String exceptionName) + { + super.failed(exceptionName); + if(_delegate != null) + { + _delegate.failed(exceptionName); + } + } + + public O + getDelegate() + { + return _delegate; + } + + public void + setDelegate(O del) + { + _delegate = del; + } + + @SuppressWarnings("unchecked") + public <S extends Metrics, ObserverImpl extends ObserverWithDelegate<S, Obs>, + Obs extends Ice.Instrumentation.Observer> Obs + getObserver(String mapName, MetricsHelper<S> helper, Class<S> mcl, Class<ObserverImpl> ocl, Obs delegate) + { + ObserverImpl obsv = super.getObserver(mapName, helper, mcl, ocl); + if(obsv != null) + { + obsv.setDelegate(delegate); + return (Obs)obsv; + } + return delegate; + } + + protected O _delegate; +}; diff --git a/java/src/IceMX/ObserverI.java b/java/src/IceMX/ObserverWithDelegateI.java index 23baf30408d..ab0c72c48ee 100644 --- a/java/src/IceMX/ObserverI.java +++ b/java/src/IceMX/ObserverWithDelegateI.java @@ -9,6 +9,6 @@ package IceMX; -public class ObserverI extends Observer<Metrics> +public class ObserverWithDelegateI extends ObserverWithDelegate<Metrics, Ice.Instrumentation.Observer> { }; diff --git a/java/test/Ice/metrics/AllTests.java b/java/test/Ice/metrics/AllTests.java index 6c36493f621..35c874eaf8f 100644 --- a/java/test/Ice/metrics/AllTests.java +++ b/java/test/Ice/metrics/AllTests.java @@ -397,7 +397,7 @@ public class AllTests } static MetricsPrx - allTests(Ice.Communicator communicator, PrintWriter out) + allTests(Ice.Communicator communicator, PrintWriter out, CommunicatorObserverI obsv) throws IceMX.UnknownMetricsView { MetricsPrx metrics = MetricsPrxHelper.checkedCast(communicator.stringToProxy("metrics:default -p 12010")); @@ -1076,6 +1076,41 @@ public class AllTests out.println("ok"); + out.print("testing instrumentation observer delegate... "); + out.flush(); + + test(obsv.threadObserver.total > 0); + test(obsv.connectionObserver.total > 0); + test(obsv.connectionEstablishmentObserver.total > 0); + test(obsv.endpointLookupObserver.total > 0); + test(obsv.dispatchObserver.total > 0); + test(obsv.invocationObserver.total > 0); + test(obsv.invocationObserver.remoteObserver.total > 0); + + test(obsv.threadObserver.current > 0); + test(obsv.connectionObserver.current > 0); + test(obsv.connectionEstablishmentObserver.current == 0); + test(obsv.endpointLookupObserver.current == 0); + test(obsv.dispatchObserver.current == 0); + test(obsv.invocationObserver.current == 0); + test(obsv.invocationObserver.remoteObserver.current == 0); + + test(obsv.threadObserver.failedCount == 0); + test(obsv.connectionObserver.failedCount > 0); + test(obsv.connectionEstablishmentObserver.failedCount > 0); + test(obsv.endpointLookupObserver.failedCount > 0); + //test(obsv.dispatchObserver.failedCount > 0); + test(obsv.invocationObserver.failedCount > 0); + test(obsv.invocationObserver.remoteObserver.failedCount > 0); + + test(obsv.threadObserver.states > 0); + test(obsv.connectionObserver.received > 0 && obsv.connectionObserver.sent > 0); + //test(obsv.dispatchObserver.userExceptionCount > 0); + test(obsv.invocationObserver.userExceptionCount > 0 && obsv.invocationObserver.retriedCount > 0); + test(obsv.invocationObserver.remoteObserver.replySize > 0); + + out.println("ok"); + return metrics; } } diff --git a/java/test/Ice/metrics/Client.java b/java/test/Ice/metrics/Client.java index 543819d1672..7ed4770bf06 100644 --- a/java/test/Ice/metrics/Client.java +++ b/java/test/Ice/metrics/Client.java @@ -18,7 +18,7 @@ public class Client extends test.Util.Application Ice.Communicator communicator = communicator(); try { - MetricsPrx metrics = AllTests.allTests(communicator, getWriter()); + MetricsPrx metrics = AllTests.allTests(communicator, getWriter(), _observer); metrics.shutdown(); } catch(Ice.UserException ex) @@ -40,6 +40,7 @@ public class Client extends test.Util.Application initData.properties.setProperty("Ice.Admin.DelayCreation", "1"); initData.properties.setProperty("Ice.Warn.Connections", "0"); initData.properties.setProperty("Ice.MessageSizeMax", "50000"); + initData.observer = _observer; return initData; } @@ -50,4 +51,6 @@ public class Client extends test.Util.Application System.gc(); System.exit(result); } + + private CommunicatorObserverI _observer = new CommunicatorObserverI(); } diff --git a/java/test/Ice/metrics/CommunicatorObserverI.java b/java/test/Ice/metrics/CommunicatorObserverI.java new file mode 100644 index 00000000000..2973880f884 --- /dev/null +++ b/java/test/Ice/metrics/CommunicatorObserverI.java @@ -0,0 +1,140 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorObserver +{ + private static void + test(boolean b) + { + if(!b) + { + throw new RuntimeException(); + } + } + + public void + setObserverUpdater(Ice.Instrumentation.ObserverUpdater u) + { + updater = u; + } + + synchronized public Ice.Instrumentation.Observer + getConnectionEstablishmentObserver(Ice.Endpoint e, String s) + { + if(connectionEstablishmentObserver == null) + { + connectionEstablishmentObserver = new ObserverI(); + connectionEstablishmentObserver.reset(); + } + return connectionEstablishmentObserver; + } + + + synchronized public Ice.Instrumentation.Observer + getEndpointLookupObserver(Ice.Endpoint e) + { + if(endpointLookupObserver == null) + { + endpointLookupObserver = new ObserverI(); + endpointLookupObserver.reset(); + } + return endpointLookupObserver; + } + + synchronized public Ice.Instrumentation.ConnectionObserver + getConnectionObserver(Ice.ConnectionInfo c, + Ice.Endpoint e, + Ice.Instrumentation.ConnectionState s, + Ice.Instrumentation.ConnectionObserver old) + { + test(old == null || old instanceof ConnectionObserverI); + if(connectionObserver == null) + { + connectionObserver = new ConnectionObserverI(); + connectionObserver.reset(); + } + return connectionObserver; + } + + synchronized public Ice.Instrumentation.ThreadObserver + getThreadObserver(String p, String id, Ice.Instrumentation.ThreadState s, + Ice.Instrumentation.ThreadObserver old) + { + test(old == null || old instanceof ThreadObserverI); + if(threadObserver == null) + { + threadObserver = new ThreadObserverI(); + threadObserver.reset(); + } + return threadObserver; + } + + synchronized public Ice.Instrumentation.InvocationObserver + getInvocationObserver(Ice.ObjectPrx p, String op, java.util.Map<String, String> ctx) + { + if(invocationObserver == null) + { + invocationObserver = new InvocationObserverI(); + invocationObserver.reset(); + } + return invocationObserver; + } + + synchronized public Ice.Instrumentation.DispatchObserver + getDispatchObserver(Ice.Current current, int s) + { + if(dispatchObserver == null) + { + dispatchObserver = new DispatchObserverI(); + dispatchObserver.reset(); + } + return dispatchObserver; + } + + synchronized void + reset() + { + if(connectionEstablishmentObserver != null) + { + connectionEstablishmentObserver.reset(); + } + if(endpointLookupObserver != null) + { + endpointLookupObserver.reset(); + } + if(connectionObserver != null) + { + connectionObserver.reset(); + } + if(threadObserver != null) + { + threadObserver.reset(); + } + if(invocationObserver != null) + { + invocationObserver.reset(); + } + if(dispatchObserver != null) + { + dispatchObserver.reset(); + } + } + + Ice.Instrumentation.ObserverUpdater updater; + + ObserverI connectionEstablishmentObserver; + ObserverI endpointLookupObserver; + ConnectionObserverI connectionObserver; + ThreadObserverI threadObserver; + InvocationObserverI invocationObserver; + DispatchObserverI dispatchObserver; +}; + diff --git a/java/test/Ice/metrics/ConnectionObserverI.java b/java/test/Ice/metrics/ConnectionObserverI.java new file mode 100644 index 00000000000..721d527d748 --- /dev/null +++ b/java/test/Ice/metrics/ConnectionObserverI.java @@ -0,0 +1,37 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class ConnectionObserverI extends ObserverI implements Ice.Instrumentation.ConnectionObserver +{ + public synchronized void + reset() + { + super.reset(); + received = 0; + sent = 0; + } + + public synchronized void + sentBytes(int s) + { + sent += s; + } + + public synchronized void + receivedBytes(int s) + { + received += s; + } + + int sent; + int received; +}; + diff --git a/java/test/Ice/metrics/DispatchObserverI.java b/java/test/Ice/metrics/DispatchObserverI.java new file mode 100644 index 00000000000..1a29c236bd9 --- /dev/null +++ b/java/test/Ice/metrics/DispatchObserverI.java @@ -0,0 +1,37 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class DispatchObserverI extends ObserverI implements Ice.Instrumentation.DispatchObserver +{ + public synchronized void + reset() + { + super.reset(); + userExceptionCount = 0; + replySize = 0; + } + + public synchronized void + userException() + { + ++userExceptionCount; + } + + public synchronized void + reply(int s) + { + replySize += s; + } + + int userExceptionCount; + int replySize; +}; + diff --git a/java/test/Ice/metrics/InvocationObserverI.java b/java/test/Ice/metrics/InvocationObserverI.java new file mode 100644 index 00000000000..447ea094c5b --- /dev/null +++ b/java/test/Ice/metrics/InvocationObserverI.java @@ -0,0 +1,53 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class InvocationObserverI extends ObserverI implements Ice.Instrumentation.InvocationObserver +{ + public synchronized void + reset() + { + super.reset(); + retriedCount = 0; + userExceptionCount = 0; + if(remoteObserver != null) + { + remoteObserver.reset(); + } + } + + public synchronized void + retried() + { + ++retriedCount; + } + + public synchronized void + userException() + { + ++userExceptionCount; + } + + public synchronized Ice.Instrumentation.RemoteObserver + getRemoteObserver(Ice.ConnectionInfo c, Ice.Endpoint e, int a, int b) + { + if(remoteObserver == null) + { + remoteObserver = new RemoteObserverI(); + remoteObserver.reset(); + } + return remoteObserver; + } + + int userExceptionCount; + int retriedCount; + + RemoteObserverI remoteObserver = null; +}; diff --git a/java/test/Ice/metrics/ObserverI.java b/java/test/Ice/metrics/ObserverI.java new file mode 100644 index 00000000000..46ec62a275b --- /dev/null +++ b/java/test/Ice/metrics/ObserverI.java @@ -0,0 +1,42 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class ObserverI implements Ice.Instrumentation.Observer +{ + synchronized public void + reset() + { + total = 0; + current = 0; + failedCount = 0; + } + + synchronized public void + attach() + { + ++total; + ++current; + } + synchronized public void + detach() + { + --current; + } + synchronized public void + failed(String s) + { + ++failedCount; + } + + int total; + int current; + int failedCount; +}; diff --git a/java/test/Ice/metrics/RemoveObserverI.java b/java/test/Ice/metrics/RemoveObserverI.java new file mode 100644 index 00000000000..82ecf2c9016 --- /dev/null +++ b/java/test/Ice/metrics/RemoveObserverI.java @@ -0,0 +1,29 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class RemoteObserverI extends ObserverI implements Ice.Instrumentation.RemoteObserver +{ + public synchronized void + reset() + { + super.reset(); + replySize = 0; + } + + public synchronized void + reply(int s) + { + replySize += s; + } + + int replySize; +}; + diff --git a/java/test/Ice/metrics/ThreadObserverI.java b/java/test/Ice/metrics/ThreadObserverI.java new file mode 100644 index 00000000000..4221039ae22 --- /dev/null +++ b/java/test/Ice/metrics/ThreadObserverI.java @@ -0,0 +1,29 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class ThreadObserverI extends ObserverI implements Ice.Instrumentation.ThreadObserver +{ + public synchronized void + reset() + { + super.reset(); + states = 0; + } + + public synchronized void + stateChanged(Ice.Instrumentation.ThreadState o, Ice.Instrumentation.ThreadState n) + { + ++states; + } + + int states; +}; + |