summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/include/Ice/MetricsObserverI.h32
-rw-r--r--cpp/src/Ice/CommunicatorI.cpp2
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp4
-rw-r--r--cpp/src/Ice/ConnectionI.cpp20
-rw-r--r--cpp/src/Ice/EndpointI.cpp6
-rw-r--r--cpp/src/Ice/Incoming.cpp2
-rw-r--r--cpp/src/Ice/Instance.cpp28
-rw-r--r--cpp/src/Ice/Instance.h6
-rw-r--r--cpp/src/Ice/InstrumentationI.cpp120
-rw-r--r--cpp/src/Ice/InstrumentationI.h146
-rw-r--r--cpp/src/Ice/ObserverHelper.cpp8
-rw-r--r--cpp/src/Ice/ThreadPool.cpp2
-rw-r--r--cpp/test/Ice/metrics/AllTests.cpp37
-rw-r--r--cpp/test/Ice/metrics/Client.cpp11
-rw-r--r--cpp/test/Ice/metrics/InstrumentationI.h334
-rw-r--r--cs/src/Ice/AsyncIOThread.cs2
-rw-r--r--cs/src/Ice/CommunicatorI.cs2
-rw-r--r--cs/src/Ice/ConnectionFactory.cs4
-rw-r--r--cs/src/Ice/ConnectionI.cs20
-rw-r--r--cs/src/Ice/EndpointHostResolver.cs6
-rw-r--r--cs/src/Ice/Incoming.cs2
-rw-r--r--cs/src/Ice/Instance.cs21
-rw-r--r--cs/src/Ice/InstrumentationI.cs297
-rw-r--r--cs/src/Ice/MetricsObserverI.cs15
-rw-r--r--cs/src/Ice/ObserverHelper.cs5
-rw-r--r--cs/src/Ice/ThreadPool.cs2
-rw-r--r--cs/test/Ice/metrics/AllTests.cs39
-rw-r--r--cs/test/Ice/metrics/Client.cs5
-rw-r--r--cs/test/Ice/metrics/InstrumentationI.cs376
-rw-r--r--cs/test/Ice/metrics/Makefile2
-rw-r--r--cs/test/Ice/metrics/Makefile.mak2
-rw-r--r--distribution/lib/DistUtils.py4
-rwxr-xr-xdistribution/makedist.py53
-rw-r--r--java/src/Ice/CommunicatorI.java5
-rw-r--r--java/src/Ice/ConnectionI.java20
-rw-r--r--java/src/IceInternal/CommunicatorObserverI.java117
-rw-r--r--java/src/IceInternal/ConnectionObserverI.java11
-rw-r--r--java/src/IceInternal/DispatchObserverI.java11
-rw-r--r--java/src/IceInternal/EndpointHostResolver.java6
-rw-r--r--java/src/IceInternal/Incoming.java2
-rw-r--r--java/src/IceInternal/Instance.java22
-rw-r--r--java/src/IceInternal/InvocationObserverI.java19
-rw-r--r--java/src/IceInternal/ObserverHelper.java5
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java4
-rw-r--r--java/src/IceInternal/RemoteObserverI.java7
-rw-r--r--java/src/IceInternal/ThreadObserverI.java8
-rw-r--r--java/src/IceInternal/ThreadPool.java2
-rw-r--r--java/src/IceMX/ObserverFactory.java17
-rw-r--r--java/src/IceMX/ObserverFactoryWithDelegate.java48
-rw-r--r--java/src/IceMX/ObserverWithDelegate.java71
-rw-r--r--java/src/IceMX/ObserverWithDelegateI.java (renamed from java/src/IceMX/ObserverI.java)2
-rw-r--r--java/test/Ice/metrics/AllTests.java37
-rw-r--r--java/test/Ice/metrics/Client.java5
-rw-r--r--java/test/Ice/metrics/CommunicatorObserverI.java140
-rw-r--r--java/test/Ice/metrics/ConnectionObserverI.java37
-rw-r--r--java/test/Ice/metrics/DispatchObserverI.java37
-rw-r--r--java/test/Ice/metrics/InvocationObserverI.java53
-rw-r--r--java/test/Ice/metrics/ObserverI.java42
-rw-r--r--java/test/Ice/metrics/RemoveObserverI.java29
-rw-r--r--java/test/Ice/metrics/ThreadObserverI.java29
60 files changed, 2174 insertions, 227 deletions
diff --git a/cpp/include/Ice/MetricsObserverI.h b/cpp/include/Ice/MetricsObserverI.h
index 797e9fcbf83..1ea304df7c3 100644
--- a/cpp/include/Ice/MetricsObserverI.h
+++ b/cpp/include/Ice/MetricsObserverI.h
@@ -430,10 +430,6 @@ public:
_metrics->registerMap<MetricsType>(name, this);
}
- ObserverFactoryT(const std::string& name) : _name(name)
- {
- }
-
~ObserverFactoryT()
{
if(_metrics)
@@ -446,6 +442,10 @@ public:
getObserver(const MetricsHelperT<MetricsType>& helper)
{
IceUtil::Mutex::Lock sync(*this);
+ if(!_metrics)
+ {
+ return 0;
+ }
typename ObserverImplType::EntrySeqType metricsObjects;
for(typename MetricsMapSeqType::const_iterator p = _maps.begin(); p != _maps.end(); ++p)
@@ -470,13 +470,18 @@ public:
template<typename ObserverPtrType> ObserverImplPtrType
getObserver(const MetricsHelperT<MetricsType>& helper, const ObserverPtrType& observer)
{
- if(!observer)
+ ObserverImplPtrType old = ObserverImplPtrType::dynamicCast(observer);
+ if(!observer || !old)
{
return getObserver(helper);
}
IceUtil::Mutex::Lock sync(*this);
- ObserverImplPtrType old = ObserverImplPtrType::dynamicCast(observer);
+ if(!_metrics)
+ {
+ return 0;
+ }
+
typename ObserverImplType::EntrySeqType metricsObjects;
for(typename MetricsMapSeqType::const_iterator p = _maps.begin(); p != _maps.end(); ++p)
{
@@ -501,6 +506,7 @@ public:
template<typename SubMapMetricsType> void
registerSubMap(const std::string& subMap, MetricsMap MetricsType::* member)
{
+ assert(_metrics);
_metrics->registerSubMap<SubMapMetricsType>(_name, subMap, member);
}
@@ -514,6 +520,11 @@ public:
UpdaterPtr updater;
{
IceUtil::Mutex::Lock sync(*this);
+ if(!_metrics)
+ {
+ return;
+ }
+
std::vector<IceInternal::MetricsMapIPtr> maps = _metrics->getMaps(_name);
_maps.clear();
for(std::vector<IceInternal::MetricsMapIPtr>::const_iterator p = maps.begin(); p != maps.end(); ++p)
@@ -537,9 +548,16 @@ public:
_updater = updater;
}
+ void destroy()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ _metrics = 0;
+ _maps.clear();
+ }
+
private:
- const IceInternal::MetricsAdminIPtr _metrics;
+ IceInternal::MetricsAdminIPtr _metrics;
const std::string _name;
MetricsMapSeqType _maps;
volatile bool _enabled;
diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp
index f3e17c4dd2d..4646479d978 100644
--- a/cpp/src/Ice/CommunicatorI.cpp
+++ b/cpp/src/Ice/CommunicatorI.cpp
@@ -261,7 +261,7 @@ Ice::CommunicatorI::getStats() const
Ice::Instrumentation::CommunicatorObserverPtr
Ice::CommunicatorI::getObserver() const
{
- return _instance->initializationData().observer;
+ return _instance->getObserver();
}
RouterPrx
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 60534489cd9..822e26b84ec 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -225,7 +225,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
// Try to establish the connection to the connectors.
//
DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
- const CommunicatorObserverPtr& obsv = _instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _instance->getObserver();
vector<ConnectorInfo>::const_iterator q;
for(q = connectors.begin(); q != connectors.end(); ++q)
{
@@ -1122,7 +1122,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector()
try
{
- const CommunicatorObserverPtr& obsv = _factory->_instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _factory->_instance->getObserver();
if(obsv)
{
_observer = obsv->getConnectionEstablishmentObserver(_iter->endpoint, _iter->connector->toString());
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 8091d2f2233..fbf9b2ea5f6 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -525,11 +525,11 @@ Ice::ConnectionI::updateObserver()
return;
}
- assert(_instance->initializationData().observer);
- _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(),
- _endpoint,
- toConnectionState(_state),
- _observer.get()));
+ assert(_instance->getObserver());
+ _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ toConnectionState(_state),
+ _observer.get()));
}
void
@@ -2188,16 +2188,16 @@ Ice::ConnectionI::setState(State state)
}
}
- if(_instance->initializationData().observer)
+ if(_instance->getObserver())
{
ConnectionState oldState = toConnectionState(_state);
ConnectionState newState = toConnectionState(state);
if(oldState != newState)
{
- _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(),
- _endpoint,
- newState,
- _observer.get()));
+ _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ newState,
+ _observer.get()));
}
if(_observer && state == StateClosed && _exception.get())
{
diff --git a/cpp/src/Ice/EndpointI.cpp b/cpp/src/Ice/EndpointI.cpp
index a27c4082faf..7d70754021b 100644
--- a/cpp/src/Ice/EndpointI.cpp
+++ b/cpp/src/Ice/EndpointI.cpp
@@ -139,7 +139,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En
}
ObserverHelperT<> observer;
- const CommunicatorObserverPtr& obsv = _instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _instance->getObserver();
if(obsv)
{
observer.attach(obsv->getEndpointLookupObserver(endpoint));
@@ -201,7 +201,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En
entry.endpoint = endpoint;
entry.callback = callback;
- const CommunicatorObserverPtr& obsv = _instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _instance->getObserver();
if(obsv)
{
entry.observer = obsv->getEndpointLookupObserver(endpoint);
@@ -310,7 +310,7 @@ void
IceInternal::EndpointHostResolver::updateObserver()
{
Lock sync(*this);
- const CommunicatorObserverPtr& obsv = _instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _instance->getObserver();
if(obsv)
{
_observer.attach(obsv->getThreadObserver("Communicator", name(), ThreadStateIdle, _observer.get()));
diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp
index 928c3c50088..e516940c84f 100644
--- a/cpp/src/Ice/Incoming.cpp
+++ b/cpp/src/Ice/Incoming.cpp
@@ -592,7 +592,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
_current.ctx.insert(_current.ctx.end(), pr);
}
- const CommunicatorObserverPtr& obsv = _is->instance()->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _is->instance()->getObserver();
if(obsv)
{
// Read the parameter encapsulation size.
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index f5a61cb9995..19fa9701352 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -1148,18 +1148,20 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi
// Setup the communicator observer only if the user didn't already set an
// Ice observer resolver and if the admininistrative endpoints are set.
//
- if(!_initData.observer &&
- (_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end()) &&
+ if((_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end()) &&
_initData.properties->getProperty("Ice.Admin.Endpoints") != "")
{
- CommunicatorObserverIPtr observer = new CommunicatorObserverI(_metricsAdmin);
- _initData.observer = observer;
+ _observer = new CommunicatorObserverI(_metricsAdmin, _initData.observer);
//
// Make sure the admin plugin receives property updates.
//
props->addUpdateCallback(_metricsAdmin);
}
+ else
+ {
+ _observer = _initData.observer;
+ }
__setNoDelete(false);
}
@@ -1229,10 +1231,10 @@ IceInternal::Instance::finishSetup(int& argc, char* argv[])
//
// Set observer updater
//
- if(_initData.observer)
+ if(_observer)
{
- theCollector->updateObserver(_initData.observer);
- _initData.observer->setObserverUpdater(new ObserverUpdaterI(this));
+ theCollector->updateObserver(_observer);
+ _observer->setObserverUpdater(new ObserverUpdaterI(this));
}
//
@@ -1398,18 +1400,20 @@ IceInternal::Instance::destroy()
_retryQueue->destroy();
}
- if(_initData.observer && theCollector)
+ if(_observer && theCollector)
{
- theCollector->clearObserver(_initData.observer);
+ theCollector->clearObserver(_observer);
}
if(_metricsAdmin)
{
_metricsAdmin->destroy();
_metricsAdmin = 0;
- if(CommunicatorObserverIPtr::dynamicCast(_initData.observer))
+
+ // Break cyclic reference counts. Don't clear _observer, it's immutable.
+ if(_observer)
{
- _initData.observer = 0; // Clear cyclic reference counts.
+ CommunicatorObserverIPtr::dynamicCast(_observer)->destroy();
}
}
@@ -1567,7 +1571,7 @@ IceInternal::Instance::updateThreadObservers()
_endpointHostResolver->updateObserver();
}
assert(theCollector);
- theCollector->updateObserver(_initData.observer);
+ theCollector->updateObserver(_observer);
}
catch(const Ice::CommunicatorDestroyedException&)
{
diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h
index 16aa25648b0..34261196016 100644
--- a/cpp/src/Ice/Instance.h
+++ b/cpp/src/Ice/Instance.h
@@ -93,6 +93,11 @@ public:
void addAdminFacet(const Ice::ObjectPtr&, const std::string&);
Ice::ObjectPtr removeAdminFacet(const std::string&);
Ice::ObjectPtr findAdminFacet(const std::string&);
+
+ const Ice::Instrumentation::CommunicatorObserverPtr& getObserver() const
+ {
+ return _observer;
+ }
const Ice::ImplicitContextIPtr& getImplicitContext() const
{
@@ -157,6 +162,7 @@ private:
Ice::Identity _adminIdentity;
std::set<std::string> _adminFacetFilter;
IceInternal::MetricsAdminIPtr _metricsAdmin;
+ Ice::Instrumentation::CommunicatorObserverPtr _observer;
};
class ProcessI : public Ice::Process
diff --git a/cpp/src/Ice/InstrumentationI.cpp b/cpp/src/Ice/InstrumentationI.cpp
index 82598bbf1dc..fcfae4200a7 100644
--- a/cpp/src/Ice/InstrumentationI.cpp
+++ b/cpp/src/Ice/InstrumentationI.cpp
@@ -694,48 +694,81 @@ void
ConnectionObserverI::sentBytes(Int num)
{
forEach(add(&ConnectionMetrics::sentBytes, num));
+ if(_delegate)
+ {
+ _delegate->sentBytes(num);
+ }
}
void
ConnectionObserverI::receivedBytes(Int num)
{
forEach(add(&ConnectionMetrics::receivedBytes, num));
+ if(_delegate)
+ {
+ _delegate->receivedBytes(num);
+ }
}
void
ThreadObserverI::stateChanged(ThreadState oldState, ThreadState newState)
{
forEach(ThreadStateChanged(oldState, newState));
+ if(_delegate)
+ {
+ _delegate->stateChanged(oldState, newState);
+ }
+
}
void
DispatchObserverI::userException()
{
forEach(inc(&DispatchMetrics::userException));
+ if(_delegate)
+ {
+ _delegate->userException();
+ }
}
void
DispatchObserverI::reply(Int size)
{
forEach(add(&DispatchMetrics::replySize, size));
+ if(_delegate)
+ {
+ _delegate->reply(size);
+ }
}
void
RemoteObserverI::reply(Int size)
{
forEach(add(&RemoteMetrics::replySize, size));
+ if(_delegate)
+ {
+ _delegate->reply(size);
+ }
}
void
InvocationObserverI::retried()
{
forEach(inc(&InvocationMetrics::retry));
+ if(_delegate)
+ {
+ _delegate->retried();
+ }
}
void
InvocationObserverI::userException()
{
forEach(inc(&InvocationMetrics::userException));
+ if(_delegate)
+ {
+ _delegate->userException();
+ }
}
RemoteObserverPtr
@@ -746,7 +779,14 @@ InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection,
{
try
{
- return getObserver<RemoteObserverI>("Remote", RemoteInvocationHelper(connection, endpoint, requestId, size));
+ RemoteObserverPtr delegate;
+ if(_delegate)
+ {
+ delegate = _delegate->getRemoteObserver(connection, endpoint, requestId, size);
+ }
+ return getObserver<RemoteObserverI>("Remote",
+ RemoteInvocationHelper(connection, endpoint, requestId, size),
+ delegate);
}
catch(const exception&)
{
@@ -754,8 +794,11 @@ InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection,
return 0;
}
-CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr& metrics) :
+CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr& metrics,
+ const Ice::Instrumentation::CommunicatorObserverPtr& delegate) :
_metrics(metrics),
+ _logger(metrics->getLogger()),
+ _delegate(delegate),
_connections(metrics, "Connection"),
_dispatch(metrics, "Dispatch"),
_invocations(metrics, "Invocation"),
@@ -771,6 +814,10 @@ CommunicatorObserverI::setObserverUpdater(const ObserverUpdaterPtr& updater)
{
_connections.setUpdater(newUpdater(updater, &ObserverUpdater::updateConnectionObservers));
_threads.setUpdater(newUpdater(updater, &ObserverUpdater::updateThreadObservers));
+ if(_delegate)
+ {
+ _delegate->setObserverUpdater(updater);
+ }
}
ObserverPtr
@@ -780,11 +827,16 @@ CommunicatorObserverI::getConnectionEstablishmentObserver(const EndpointPtr& end
{
try
{
- return _connects.getObserver(EndpointHelper(endpt, connector));
+ ObserverPtr delegate;
+ if(_delegate)
+ {
+ delegate = _delegate->getConnectionEstablishmentObserver(endpt, connector);
+ }
+ return _connects.getObserver(EndpointHelper(endpt, connector), delegate);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -798,11 +850,16 @@ CommunicatorObserverI::getEndpointLookupObserver(const EndpointPtr& endpt)
{
try
{
- return _endpointLookups.getObserver(EndpointHelper(endpt));
+ ObserverPtr delegate;
+ if(_delegate)
+ {
+ delegate = _delegate->getEndpointLookupObserver(endpt);
+ }
+ return _endpointLookups.getObserver(EndpointHelper(endpt), delegate);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -819,11 +876,17 @@ CommunicatorObserverI::getConnectionObserver(const ConnectionInfoPtr& con,
{
try
{
- return _connections.getObserver(ConnectionHelper(con, endpt, state), observer);
+ ConnectionObserverPtr delegate;
+ ConnectionObserverI* o = dynamic_cast<ConnectionObserverI*>(observer.get());
+ if(_delegate)
+ {
+ delegate = _delegate->getConnectionObserver(con, endpt, state, o ? o->getDelegate() : observer);
+ }
+ return _connections.getObserver(ConnectionHelper(con, endpt, state), delegate, observer);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -840,11 +903,17 @@ CommunicatorObserverI::getThreadObserver(const string& parent,
{
try
{
- return _threads.getObserver(ThreadHelper(parent, id, state), observer);
+ ThreadObserverPtr delegate;
+ ThreadObserverI* o = dynamic_cast<ThreadObserverI*>(observer.get());
+ if(_delegate)
+ {
+ delegate = _delegate->getThreadObserver(parent, id, state, o ? o->getDelegate() : observer);
+ }
+ return _threads.getObserver(ThreadHelper(parent, id, state), delegate, observer);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -858,11 +927,16 @@ CommunicatorObserverI::getInvocationObserver(const ObjectPrx& proxy, const strin
{
try
{
- return _invocations.getObserver(InvocationHelper(proxy, op, ctx));
+ InvocationObserverPtr delegate;
+ if(_delegate)
+ {
+ delegate = _delegate->getInvocationObserver(proxy, op, ctx);
+ }
+ return _invocations.getObserver(InvocationHelper(proxy, op, ctx), delegate);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -876,11 +950,16 @@ CommunicatorObserverI::getDispatchObserver(const Current& current, int size)
{
try
{
- return _dispatch.getObserver(DispatchHelper(current, size));
+ DispatchObserverPtr delegate;
+ if(_delegate)
+ {
+ delegate = _delegate->getDispatchObserver(current, size);
+ }
+ return _dispatch.getObserver(DispatchHelper(current, size), delegate);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -890,5 +969,18 @@ CommunicatorObserverI::getDispatchObserver(const Current& current, int size)
const IceInternal::MetricsAdminIPtr&
CommunicatorObserverI::getMetricsAdmin() const
{
+ assert(_metrics);
return _metrics;
}
+
+void
+CommunicatorObserverI::destroy()
+{
+ _metrics = 0;
+ _connections.destroy();
+ _dispatch.destroy();
+ _invocations.destroy();
+ _threads.destroy();
+ _connects.destroy();
+ _endpointLookups.destroy();
+}
diff --git a/cpp/src/Ice/InstrumentationI.h b/cpp/src/Ice/InstrumentationI.h
index 9f6b61327b9..1db093feae2 100644
--- a/cpp/src/Ice/InstrumentationI.h
+++ b/cpp/src/Ice/InstrumentationI.h
@@ -16,6 +16,109 @@
namespace IceInternal
{
+template<typename T, typename O> class ObserverWithDelegateT : public IceMX::ObserverT<T>, virtual public O
+{
+public:
+
+ typedef O ObserverType;
+ typedef typename IceInternal::Handle<O> ObserverPtrType;
+
+ virtual void
+ attach()
+ {
+ IceMX::ObserverT<T>::attach();
+ if(_delegate)
+ {
+ _delegate->attach();
+ }
+ }
+
+ virtual void
+ detach()
+ {
+ IceMX::ObserverT<T>::detach();
+ if(_delegate)
+ {
+ _delegate->detach();
+ }
+ }
+
+ virtual void
+ failed(const std::string& exceptionName)
+ {
+ IceMX::ObserverT<T>::failed(exceptionName);
+ if(_delegate)
+ {
+ _delegate->failed(exceptionName);
+ }
+ }
+
+ ObserverPtrType
+ getDelegate() const
+ {
+ return _delegate;
+ }
+
+ void
+ setDelegate(ObserverPtrType delegate)
+ {
+ _delegate = delegate;
+ }
+
+ template<typename ObserverImpl, typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType
+ getObserver(const std::string& mapName, const IceMX::MetricsHelperT<ObserverMetricsType>& helper,
+ const ObserverPtrType& del)
+ {
+ IceInternal::Handle<ObserverImpl> obsv = IceMX::ObserverT<T>::template getObserver<ObserverImpl>(mapName,
+ helper);
+ if(obsv)
+ {
+ obsv->setDelegate(del);
+ return obsv;
+ }
+ return del;
+ }
+
+protected:
+
+ ObserverPtrType _delegate;
+};
+
+template<typename T> class ObserverFactoryWithDelegateT : public IceMX::ObserverFactoryT<T>
+{
+public:
+
+ ObserverFactoryWithDelegateT(const IceInternal::MetricsAdminIPtr& metrics, const std::string& name) :
+ IceMX::ObserverFactoryT<T>(metrics, name)
+ {
+ }
+
+ template<typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType
+ getObserver(const IceMX::MetricsHelperT<ObserverMetricsType>& helper, const ObserverPtrType& del)
+ {
+ IceInternal::Handle<T> obsv = IceMX::ObserverFactoryT<T>::getObserver(helper);
+ if(obsv)
+ {
+ obsv->setDelegate(del);
+ return obsv;
+ }
+ return del;
+ }
+
+ template<typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType
+ getObserver(const IceMX::MetricsHelperT<ObserverMetricsType>& helper, const ObserverPtrType& del,
+ const ObserverPtrType& old)
+ {
+ IceInternal::Handle<T> obsv = IceMX::ObserverFactoryT<T>::getObserver(helper, old);
+ if(obsv)
+ {
+ obsv->setDelegate(del);
+ return obsv;
+ }
+ return del;
+ }
+};
+
template<typename Helper>
void addEndpointAttributes(typename Helper::Attributes& attrs)
{
@@ -49,8 +152,8 @@ void addConnectionAttributes(typename Helper::Attributes& attrs)
addEndpointAttributes<Helper>(attrs);
}
-class ConnectionObserverI : public Ice::Instrumentation::ConnectionObserver,
- public IceMX::ObserverT<IceMX::ConnectionMetrics>
+class ConnectionObserverI : public ObserverWithDelegateT<IceMX::ConnectionMetrics,
+ Ice::Instrumentation::ConnectionObserver>
{
public:
@@ -58,16 +161,14 @@ public:
virtual void receivedBytes(Ice::Int);
};
-class ThreadObserverI : public Ice::Instrumentation::ThreadObserver,
- public IceMX::ObserverT<IceMX::ThreadMetrics>
+class ThreadObserverI : public ObserverWithDelegateT<IceMX::ThreadMetrics, Ice::Instrumentation::ThreadObserver>
{
public:
virtual void stateChanged(Ice::Instrumentation::ThreadState, Ice::Instrumentation::ThreadState);
};
-class DispatchObserverI : public Ice::Instrumentation::DispatchObserver,
- public IceMX::ObserverT<IceMX::DispatchMetrics>
+class DispatchObserverI : public ObserverWithDelegateT<IceMX::DispatchMetrics, Ice::Instrumentation::DispatchObserver>
{
public:
@@ -76,14 +177,15 @@ public:
virtual void reply(Ice::Int);
};
-class RemoteObserverI : public Ice::Instrumentation::RemoteObserver,
- public IceMX::ObserverT<IceMX::RemoteMetrics>
+class RemoteObserverI : public ObserverWithDelegateT<IceMX::RemoteMetrics, Ice::Instrumentation::RemoteObserver>
{
+public:
+
virtual void reply(Ice::Int);
};
-class InvocationObserverI : public Ice::Instrumentation::InvocationObserver,
- public IceMX::ObserverT<IceMX::InvocationMetrics>
+class InvocationObserverI : public ObserverWithDelegateT<IceMX::InvocationMetrics,
+ Ice::Instrumentation::InvocationObserver>
{
public:
@@ -95,11 +197,15 @@ public:
const Ice::EndpointPtr&, Ice::Int, Ice::Int);
};
+typedef ObserverWithDelegateT<IceMX::Metrics, Ice::Instrumentation::Observer> ObserverI;
+
class ICE_API CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver
{
public:
- CommunicatorObserverI(const IceInternal::MetricsAdminIPtr&);
+ CommunicatorObserverI(const IceInternal::MetricsAdminIPtr&,
+ const Ice::Instrumentation::CommunicatorObserverPtr& =
+ Ice::Instrumentation::CommunicatorObserverPtr());
virtual void setObserverUpdater(const Ice::Instrumentation::ObserverUpdaterPtr&);
@@ -126,16 +232,20 @@ public:
const IceInternal::MetricsAdminIPtr& getMetricsAdmin() const;
+ void destroy();
+
private:
- const IceInternal::MetricsAdminIPtr _metrics;
+ IceInternal::MetricsAdminIPtr _metrics;
+ Ice::LoggerPtr _logger;
+ const Ice::Instrumentation::CommunicatorObserverPtr _delegate;
- IceMX::ObserverFactoryT<ConnectionObserverI> _connections;
- IceMX::ObserverFactoryT<DispatchObserverI> _dispatch;
- IceMX::ObserverFactoryT<InvocationObserverI> _invocations;
- IceMX::ObserverFactoryT<ThreadObserverI> _threads;
- IceMX::ObserverFactoryT<IceMX::ObserverI> _connects;
- IceMX::ObserverFactoryT<IceMX::ObserverI> _endpointLookups;
+ ObserverFactoryWithDelegateT<ConnectionObserverI> _connections;
+ ObserverFactoryWithDelegateT<DispatchObserverI> _dispatch;
+ ObserverFactoryWithDelegateT<InvocationObserverI> _invocations;
+ ObserverFactoryWithDelegateT<ThreadObserverI> _threads;
+ ObserverFactoryWithDelegateT<ObserverI> _connects;
+ ObserverFactoryWithDelegateT<ObserverI> _endpointLookups;
};
typedef IceUtil::Handle<CommunicatorObserverI> CommunicatorObserverIPtr;
diff --git a/cpp/src/Ice/ObserverHelper.cpp b/cpp/src/Ice/ObserverHelper.cpp
index ca148bff6da..4215d77eabd 100644
--- a/cpp/src/Ice/ObserverHelper.cpp
+++ b/cpp/src/Ice/ObserverHelper.cpp
@@ -25,7 +25,7 @@ Ice::Context emptyCtx;
IceInternal::InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx)
{
- const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->getObserver();
if(!obsv)
{
return;
@@ -43,7 +43,7 @@ IceInternal::InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy
IceInternal::InvocationObserver::InvocationObserver(IceInternal::Instance* instance, const string& op)
{
- const CommunicatorObserverPtr& obsv = instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = instance->getObserver();
if(!obsv)
{
return;
@@ -55,7 +55,7 @@ IceInternal::InvocationObserver::InvocationObserver(IceInternal::Instance* insta
void
IceInternal::InvocationObserver::attach(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx)
{
- const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->getObserver();
if(!obsv)
{
return;
@@ -74,7 +74,7 @@ IceInternal::InvocationObserver::attach(IceProxy::Ice::Object* proxy, const stri
void
IceInternal::InvocationObserver::attach(IceInternal::Instance* instance, const string& op)
{
- const CommunicatorObserverPtr& obsv = instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = instance->getObserver();
if(!obsv)
{
return;
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 62652c786d2..2234e8c7ceb 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -1128,7 +1128,7 @@ void
IceInternal::ThreadPool::EventHandlerThread::updateObserver()
{
// Must be called with the thread pool mutex locked
- const CommunicatorObserverPtr& obsv = _pool->_instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _pool->_instance->getObserver();
if(obsv)
{
_observer.attach(obsv->getThreadObserver(_pool->_prefix, name(), _state, _observer.get()));
diff --git a/cpp/test/Ice/metrics/AllTests.cpp b/cpp/test/Ice/metrics/AllTests.cpp
index a181f43fbb1..ea097a349ac 100644
--- a/cpp/test/Ice/metrics/AllTests.cpp
+++ b/cpp/test/Ice/metrics/AllTests.cpp
@@ -9,6 +9,7 @@
#include <Ice/Ice.h>
#include <TestCommon.h>
+#include <InstrumentationI.h>
#include <Test.h>
using namespace std;
@@ -368,7 +369,7 @@ toMap(const IceMX::MetricsMap& mmap)
}
MetricsPrx
-allTests(const Ice::CommunicatorPtr& communicator)
+allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPtr& obsv)
{
MetricsPrx metrics = MetricsPrx::checkedCast(communicator->stringToProxy("metrics:default -p 12010"));
@@ -1047,5 +1048,39 @@ allTests(const Ice::CommunicatorPtr& communicator)
cout << "ok" << endl;
+ cout << "testing instrumentation observer delegate... " << flush;
+
+ test(obsv->threadObserver->total > 0);
+ test(obsv->connectionObserver->total > 0);
+ test(obsv->connectionEstablishmentObserver->total > 0);
+ test(obsv->endpointLookupObserver->total > 0);
+ test(obsv->dispatchObserver->total > 0);
+ test(obsv->invocationObserver->total > 0);
+ test(obsv->invocationObserver->remoteObserver->total > 0);
+
+ test(obsv->threadObserver->current > 0);
+ test(obsv->connectionObserver->current > 0);
+ test(obsv->connectionEstablishmentObserver->current == 0);
+ test(obsv->endpointLookupObserver->current == 0);
+ test(obsv->dispatchObserver->current == 0);
+ test(obsv->invocationObserver->current == 0);
+ test(obsv->invocationObserver->remoteObserver->current == 0);
+
+ test(obsv->threadObserver->failedCount == 0);
+ test(obsv->connectionObserver->failedCount > 0);
+ test(obsv->connectionEstablishmentObserver->failedCount > 0);
+ test(obsv->endpointLookupObserver->failedCount > 0);
+ //test(obsv->dispatchObserver->failedCount > 0);
+ test(obsv->invocationObserver->failedCount > 0);
+ test(obsv->invocationObserver->remoteObserver->failedCount > 0);
+
+ test(obsv->threadObserver->states > 0);
+ test(obsv->connectionObserver->received > 0 && obsv->connectionObserver->sent > 0);
+ //test(obsv->dispatchObserver->userExceptionCount > 0);
+ test(obsv->invocationObserver->userExceptionCount > 0 && obsv->invocationObserver->retriedCount > 0);
+ test(obsv->invocationObserver->remoteObserver->replySize > 0);
+
+ cout << "ok" << endl;
+
return metrics;
}
diff --git a/cpp/test/Ice/metrics/Client.cpp b/cpp/test/Ice/metrics/Client.cpp
index f30ad6032ad..4f541c2f7cc 100644
--- a/cpp/test/Ice/metrics/Client.cpp
+++ b/cpp/test/Ice/metrics/Client.cpp
@@ -10,6 +10,7 @@
#include <Ice/Ice.h>
#include <TestCommon.h>
#include <Test.h>
+#include <InstrumentationI.h>
DEFINE_TEST("client")
@@ -17,10 +18,10 @@ using namespace std;
using namespace Test;
int
-run(int, char**, const Ice::CommunicatorPtr& communicator)
+run(int, char**, const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPtr& observer)
{
- MetricsPrx allTests(const Ice::CommunicatorPtr&);
- MetricsPrx metrics = allTests(communicator);
+ MetricsPrx allTests(const Ice::CommunicatorPtr&, const CommunicatorObserverIPtr&);
+ MetricsPrx metrics = allTests(communicator, observer);
metrics->shutdown();
return EXIT_SUCCESS;
}
@@ -40,8 +41,10 @@ main(int argc, char* argv[])
initData.properties->setProperty("Ice.Admin.DelayCreation", "1");
initData.properties->setProperty("Ice.Warn.Connections", "0");
initData.properties->setProperty("Ice.MessageSizeMax", "50000");
+ CommunicatorObserverIPtr observer = new CommunicatorObserverI();
+ initData.observer = observer;
communicator = Ice::initialize(argc, argv, initData);
- status = run(argc, argv, communicator);
+ status = run(argc, argv, communicator, observer);
}
catch(const Ice::Exception& ex)
{
diff --git a/cpp/test/Ice/metrics/InstrumentationI.h b/cpp/test/Ice/metrics/InstrumentationI.h
new file mode 100644
index 00000000000..92ff7e1d11d
--- /dev/null
+++ b/cpp/test/Ice/metrics/InstrumentationI.h
@@ -0,0 +1,334 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#ifndef INSTRUMENTATION_I_H
+#define INSTRUMENTATION_I_H
+
+#include <Ice/Instrumentation.h>
+
+class ObserverI : virtual public Ice::Instrumentation::Observer, public IceUtil::Mutex
+{
+public:
+
+ virtual void
+ reset()
+ {
+ total = 0;
+ current = 0;
+ failedCount = 0;
+ }
+
+ virtual void
+ attach()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++total;
+ ++current;
+ }
+ virtual void
+ detach()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ --current;
+ }
+ virtual void
+ failed(const std::string&)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++failedCount;
+ }
+
+ Ice::Int total;
+ Ice::Int current;
+ Ice::Int failedCount;
+};
+
+class ConnectionObserverI : public Ice::Instrumentation::ConnectionObserver, public ObserverI
+{
+public:
+
+ virtual void
+ reset()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ObserverI::reset();
+ received = 0;
+ sent = 0;
+ }
+
+ virtual void
+ sentBytes(Ice::Int s)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ sent += s;
+ }
+
+ virtual void
+ receivedBytes(Ice::Int s)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ received += s;
+ }
+
+ Ice::Int sent;
+ Ice::Int received;
+};
+
+class ThreadObserverI : public Ice::Instrumentation::ThreadObserver, public ObserverI
+{
+public:
+
+ virtual void
+ reset()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ObserverI::reset();
+ states = 0;
+ }
+
+ virtual void
+ stateChanged(Ice::Instrumentation::ThreadState, Ice::Instrumentation::ThreadState)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++states;
+ }
+
+ Ice::Int states;
+};
+
+class DispatchObserverI : public Ice::Instrumentation::DispatchObserver, public ObserverI
+{
+public:
+
+ virtual void reset()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ObserverI::reset();
+ userExceptionCount = 0;
+ replySize = 0;
+ }
+
+ virtual void
+ userException()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++userExceptionCount;
+ }
+
+ virtual void
+ reply(Ice::Int s)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ replySize += s;
+ }
+
+ Ice::Int userExceptionCount;
+ Ice::Int replySize;
+};
+
+class RemoteObserverI : public Ice::Instrumentation::RemoteObserver, public ObserverI
+{
+public:
+
+ virtual void
+ reset()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ObserverI::reset();
+ replySize = 0;
+ }
+
+ virtual void
+ reply(Ice::Int s)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ replySize += s;
+ }
+
+ Ice::Int replySize;
+};
+
+class InvocationObserverI : public Ice::Instrumentation::InvocationObserver, public ObserverI
+{
+public:
+
+ virtual void reset()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ObserverI::reset();
+ retriedCount = 0;
+ userExceptionCount = 0;
+ if(remoteObserver)
+ {
+ remoteObserver->reset();
+ }
+ }
+
+ virtual void
+ retried()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++retriedCount;
+ }
+
+ virtual void
+ userException()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++userExceptionCount;
+ }
+
+ virtual Ice::Instrumentation::RemoteObserverPtr
+ getRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& e, Ice::Int, Ice::Int)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ if(!remoteObserver)
+ {
+ remoteObserver = new RemoteObserverI();
+ remoteObserver->reset();
+ }
+ return remoteObserver;
+ }
+
+ Ice::Int userExceptionCount;
+ Ice::Int retriedCount;
+
+ IceUtil::Handle<RemoteObserverI> remoteObserver;
+};
+
+class CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver, public IceUtil::Mutex
+{
+public:
+
+ virtual void
+ setObserverUpdater(const Ice::Instrumentation::ObserverUpdaterPtr& u)
+ {
+ updater = u;
+ }
+
+ virtual Ice::Instrumentation::ObserverPtr
+ getConnectionEstablishmentObserver(const Ice::EndpointPtr&, const std::string&)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ if(!connectionEstablishmentObserver)
+ {
+ connectionEstablishmentObserver = new ObserverI();
+ connectionEstablishmentObserver->reset();
+ }
+ return connectionEstablishmentObserver;
+ }
+
+
+ virtual Ice::Instrumentation::ObserverPtr
+ getEndpointLookupObserver(const Ice::EndpointPtr&)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ if(!endpointLookupObserver)
+ {
+ endpointLookupObserver = new ObserverI();
+ endpointLookupObserver->reset();
+ }
+ return endpointLookupObserver;
+ }
+
+ virtual Ice::Instrumentation::ConnectionObserverPtr
+ getConnectionObserver(const Ice::ConnectionInfoPtr&,
+ const Ice::EndpointPtr&,
+ Ice::Instrumentation::ConnectionState,
+ const Ice::Instrumentation::ConnectionObserverPtr& old)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ test(!old || dynamic_cast<ConnectionObserverI*>(old.get()));
+ if(!connectionObserver)
+ {
+ connectionObserver = new ConnectionObserverI();
+ connectionObserver->reset();
+ }
+ return connectionObserver;
+ }
+
+ virtual Ice::Instrumentation::ThreadObserverPtr
+ getThreadObserver(const std::string&, const std::string&, Ice::Instrumentation::ThreadState,
+ const Ice::Instrumentation::ThreadObserverPtr& old)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ test(!old || dynamic_cast<ThreadObserverI*>(old.get()));
+ if(!threadObserver)
+ {
+ threadObserver = new ThreadObserverI();
+ threadObserver->reset();
+ }
+ return threadObserver;
+ }
+
+ virtual Ice::Instrumentation::InvocationObserverPtr
+ getInvocationObserver(const Ice::ObjectPrx&, const std::string&, const Ice::Context&)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ if(!invocationObserver)
+ {
+ invocationObserver = new InvocationObserverI();
+ invocationObserver->reset();
+ }
+ return invocationObserver;
+ }
+
+ virtual Ice::Instrumentation::DispatchObserverPtr
+ getDispatchObserver(const Ice::Current&, Ice::Int)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ if(!dispatchObserver)
+ {
+ dispatchObserver = new DispatchObserverI();
+ dispatchObserver->reset();
+ }
+ return dispatchObserver;
+ }
+
+ void reset()
+ {
+ if(connectionEstablishmentObserver)
+ {
+ connectionEstablishmentObserver->reset();
+ }
+ if(endpointLookupObserver)
+ {
+ endpointLookupObserver->reset();
+ }
+ if(connectionObserver)
+ {
+ connectionObserver->reset();
+ }
+ if(threadObserver)
+ {
+ threadObserver->reset();
+ }
+ if(invocationObserver)
+ {
+ invocationObserver->reset();
+ }
+ if(dispatchObserver)
+ {
+ dispatchObserver->reset();
+ }
+ }
+
+ Ice::Instrumentation::ObserverUpdaterPtr updater;
+
+ IceUtil::Handle<ObserverI> connectionEstablishmentObserver;
+ IceUtil::Handle<ObserverI> endpointLookupObserver;
+ IceUtil::Handle<ConnectionObserverI> connectionObserver;
+ IceUtil::Handle<ThreadObserverI> threadObserver;
+ IceUtil::Handle<InvocationObserverI> invocationObserver;
+ IceUtil::Handle<DispatchObserverI> dispatchObserver;
+};
+
+typedef IceUtil::Handle<CommunicatorObserverI> CommunicatorObserverIPtr;
+
+#endif
diff --git a/cs/src/Ice/AsyncIOThread.cs b/cs/src/Ice/AsyncIOThread.cs
index ebe6ea43562..cacff0a48f3 100644
--- a/cs/src/Ice/AsyncIOThread.cs
+++ b/cs/src/Ice/AsyncIOThread.cs
@@ -45,7 +45,7 @@ namespace IceInternal
_m.Lock();
try
{
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
if(obsv != null)
{
_observer = obsv.getThreadObserver("Communicator",
diff --git a/cs/src/Ice/CommunicatorI.cs b/cs/src/Ice/CommunicatorI.cs
index df3ff1e1653..a910a012424 100644
--- a/cs/src/Ice/CommunicatorI.cs
+++ b/cs/src/Ice/CommunicatorI.cs
@@ -126,7 +126,7 @@ namespace Ice
public Ice.Instrumentation.CommunicatorObserver getObserver()
{
- return instance_.initializationData().observer;
+ return instance_.getObserver();
}
public RouterPrx getDefaultRouter()
diff --git a/cs/src/Ice/ConnectionFactory.cs b/cs/src/Ice/ConnectionFactory.cs
index 886dd4dd2de..848d59d323c 100644
--- a/cs/src/Ice/ConnectionFactory.cs
+++ b/cs/src/Ice/ConnectionFactory.cs
@@ -238,7 +238,7 @@ namespace IceInternal
// Try to establish the connection to the connectors.
//
DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
ConnectorInfo ci = null;
for(int i = 0; i < connectors.Count; ++i)
{
@@ -1251,7 +1251,7 @@ namespace IceInternal
Debug.Assert(_iter < _connectors.Count);
_current = _connectors[_iter++];
- Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.getObserver();
if(obsv != null)
{
_observer = obsv.getConnectionEstablishmentObserver(_current.endpoint,
diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs
index f03ce804672..8b1facc56e0 100644
--- a/cs/src/Ice/ConnectionI.cs
+++ b/cs/src/Ice/ConnectionI.cs
@@ -325,11 +325,11 @@ namespace Ice
return;
}
- Debug.Assert(_instance.initializationData().observer != null);
- _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
- _endpoint,
- toConnectionState(_state),
- _observer);
+ Debug.Assert(_instance.getObserver() != null);
+ _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ toConnectionState(_state),
+ _observer);
if(_observer != null)
{
_observer.attach();
@@ -2062,16 +2062,16 @@ namespace Ice
}
}
- if(_instance.initializationData().observer != null)
+ if(_instance.getObserver() != null)
{
ConnectionState oldState = toConnectionState(_state);
ConnectionState newState = toConnectionState(state);
if(oldState != newState)
{
- _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
- _endpoint,
- newState,
- _observer);
+ _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ newState,
+ _observer);
if(_observer != null)
{
_observer.attach();
diff --git a/cs/src/Ice/EndpointHostResolver.cs b/cs/src/Ice/EndpointHostResolver.cs
index e65fc6fbf0c..7ef8a44b2bf 100644
--- a/cs/src/Ice/EndpointHostResolver.cs
+++ b/cs/src/Ice/EndpointHostResolver.cs
@@ -54,7 +54,7 @@ namespace IceInternal
}
}
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
Ice.Instrumentation.Observer observer = null;
if(obsv != null)
{
@@ -133,7 +133,7 @@ namespace IceInternal
entry.endpoint = endpoint;
entry.callback = callback;
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
if(obsv != null)
{
entry.observer = obsv.getEndpointLookupObserver(endpoint);
@@ -273,7 +273,7 @@ namespace IceInternal
_m.Lock();
try
{
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
if(obsv != null)
{
_observer = obsv.getThreadObserver("Communicator",
diff --git a/cs/src/Ice/Incoming.cs b/cs/src/Ice/Incoming.cs
index c185772dbdc..c4b723f40cc 100644
--- a/cs/src/Ice/Incoming.cs
+++ b/cs/src/Ice/Incoming.cs
@@ -689,7 +689,7 @@ namespace IceInternal
current_.ctx[first] = second;
}
- Ice.Instrumentation.CommunicatorObserver obsv = instance_.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = instance_.getObserver();
if(obsv != null)
{
// Read the encapsulation size.
diff --git a/cs/src/Ice/Instance.cs b/cs/src/Ice/Instance.cs
index 06ff92c55ff..7fe0cb447a3 100644
--- a/cs/src/Ice/Instance.cs
+++ b/cs/src/Ice/Instance.cs
@@ -592,6 +592,12 @@ namespace IceInternal
}
}
+ public Ice.Instrumentation.CommunicatorObserver
+ getObserver()
+ {
+ return _observer;
+ }
+
public void
setDefaultLocator(Ice.LocatorPrx locator)
{
@@ -870,18 +876,20 @@ namespace IceInternal
// Setup the communicator observer only if the user didn't already set an
// Ice observer resolver and if the admininistrative endpoints are set.
//
- if(_initData.observer == null &&
- (_adminFacetFilter.Count == 0 || _adminFacetFilter.Contains("Metrics")) &&
+ if((_adminFacetFilter.Count == 0 || _adminFacetFilter.Contains("Metrics")) &&
_initData.properties.getProperty("Ice.Admin.Endpoints").Length > 0)
{
- CommunicatorObserverI observer = new CommunicatorObserverI(admin);
- _initData.observer = observer;
+ _observer = new CommunicatorObserverI(admin, _initData.observer);
//
// Make sure the admin plugin receives property updates.
//
props.addUpdateCallback(admin);
}
+ else
+ {
+ _observer = initData.observer;
+ }
}
catch(Ice.LocalException)
{
@@ -904,9 +912,9 @@ namespace IceInternal
//
// Set observer updater
//
- if(_initData.observer != null)
+ if(_observer != null)
{
- _initData.observer.setObserverUpdater(new ObserverUpdaterI(this));
+ _observer.setObserverUpdater(new ObserverUpdaterI(this));
}
//
@@ -1265,6 +1273,7 @@ namespace IceInternal
private int _clientACM; // Immutable, not reset by destroy().
private int _serverACM; // Immutable, not reset by destroy().
private Ice.ImplicitContextI _implicitContext; // Immutable
+ private Ice.Instrumentation.CommunicatorObserver _observer; // Immutable
private RouterManager _routerManager;
private LocatorManager _locatorManager;
private ReferenceFactory _referenceFactory;
diff --git a/cs/src/Ice/InstrumentationI.cs b/cs/src/Ice/InstrumentationI.cs
index f04c5370861..fa1595115e0 100644
--- a/cs/src/Ice/InstrumentationI.cs
+++ b/cs/src/Ice/InstrumentationI.cs
@@ -16,6 +16,101 @@ namespace IceInternal
using IceMX;
+ public class ObserverWithDelegate<T, O> : Observer<T>
+ where T : Metrics, new()
+ where O : Ice.Instrumentation.Observer
+ {
+ override public void
+ attach()
+ {
+ base.attach();
+ if(delegate_ != null)
+ {
+ delegate_.attach();
+ }
+ }
+
+ override public void
+ detach()
+ {
+ base.detach();
+ if(delegate_ != null)
+ {
+ delegate_.detach();
+ }
+ }
+
+ override public void
+ failed(string exceptionName)
+ {
+ base.failed(exceptionName);
+ if(delegate_ != null)
+ {
+ delegate_.failed(exceptionName);
+ }
+ }
+
+ public O
+ getDelegate()
+ {
+ return delegate_;
+ }
+
+ public void
+ setDelegate(O del)
+ {
+ delegate_ = del;
+ }
+
+ public Observer getObserver<S, ObserverImpl, Observer>(string mapName, MetricsHelper<S> helper, Observer del)
+ where S : Metrics, new()
+ where ObserverImpl : ObserverWithDelegate<S, Observer>, Observer, new()
+ where Observer : Ice.Instrumentation.Observer
+ {
+ ObserverImpl obsv = base.getObserver<S, ObserverImpl>(mapName, helper);
+ if(obsv != null)
+ {
+ obsv.setDelegate(del);
+ return (Observer)obsv;
+ }
+ return del;
+ }
+
+ protected O delegate_;
+ };
+
+ public class ObserverFactoryWithDelegate<T, OImpl, O> : ObserverFactory<T, OImpl>
+ where T : Metrics, new()
+ where OImpl : ObserverWithDelegate<T, O>, O, new()
+ where O : Ice.Instrumentation.Observer
+ {
+ public ObserverFactoryWithDelegate(IceInternal.MetricsAdminI metrics, string name) : base(metrics, name)
+ {
+ }
+
+ public O getObserver(MetricsHelper<T> helper, O del)
+ {
+ OImpl o = base.getObserver(helper);
+ if(o != null)
+ {
+ o.setDelegate(del);
+ return o;
+ }
+ return del;
+ }
+
+ public O getObserver(MetricsHelper<T> helper, object observer, O del)
+ {
+ OImpl o = base.getObserver(helper, observer);
+ if(o != null)
+ {
+ o.setDelegate(del);
+ return o;
+ }
+ return del;
+ }
+ }
+
static class AttrsUtil
{
public static void
@@ -645,22 +740,31 @@ namespace IceInternal
private Ice.EndpointInfo _endpointInfo;
};
- public class ObserverI :Observer<Metrics>
+ public class ObserverWithDelegateI : ObserverWithDelegate<Metrics, Ice.Instrumentation.Observer>
{
};
- public class ConnectionObserverI : Observer<ConnectionMetrics>, Ice.Instrumentation.ConnectionObserver
+ public class ConnectionObserverI : ObserverWithDelegate<ConnectionMetrics, Ice.Instrumentation.ConnectionObserver>,
+ Ice.Instrumentation.ConnectionObserver
{
public void sentBytes(int num)
{
_sentBytes = num;
forEach(sentBytesUpdate);
+ if(delegate_ != null)
+ {
+ delegate_.sentBytes(num);
+ }
}
public void receivedBytes(int num)
{
_receivedBytes = num;
forEach(receivedBytesUpdate);
+ if(delegate_ != null)
+ {
+ delegate_.receivedBytes(num);
+ }
}
private void sentBytesUpdate(ConnectionMetrics v)
@@ -677,12 +781,17 @@ namespace IceInternal
private int _receivedBytes;
};
- public class DispatchObserverI : Observer<DispatchMetrics>, Ice.Instrumentation.DispatchObserver
+ public class DispatchObserverI : ObserverWithDelegate<DispatchMetrics, Ice.Instrumentation.DispatchObserver>,
+ Ice.Instrumentation.DispatchObserver
{
public void
userException()
{
forEach(userException);
+ if(delegate_ != null)
+ {
+ delegate_.userException();
+ }
}
public void reply(int size)
@@ -690,6 +799,10 @@ namespace IceInternal
forEach((DispatchMetrics v) => {
v.replySize += size;
});
+ if(delegate_ != null)
+ {
+ delegate_.reply(size);
+ }
}
private void userException(DispatchMetrics v)
@@ -698,35 +811,56 @@ namespace IceInternal
}
}
- public class RemoteObserverI : Observer<RemoteMetrics>, Ice.Instrumentation.RemoteObserver
+ public class RemoteObserverI : ObserverWithDelegate<RemoteMetrics, Ice.Instrumentation.RemoteObserver>,
+ Ice.Instrumentation.RemoteObserver
{
public void reply(int size)
{
forEach((RemoteMetrics v) => {
v.replySize += size;
});
+ if(delegate_ != null)
+ {
+ delegate_.reply(size);
+ }
}
}
- public class InvocationObserverI : Observer<InvocationMetrics>, Ice.Instrumentation.InvocationObserver
+ public class InvocationObserverI : ObserverWithDelegate<InvocationMetrics, Ice.Instrumentation.InvocationObserver>,
+ Ice.Instrumentation.InvocationObserver
{
public void
userException()
{
forEach(userException);
+ if(delegate_ != null)
+ {
+ delegate_.userException();
+ }
}
public void
retried()
{
forEach(incrementRetry);
+ if(delegate_ != null)
+ {
+ delegate_.retried();
+ }
}
public Ice.Instrumentation.RemoteObserver getRemoteObserver(Ice.ConnectionInfo con, Ice.Endpoint endpt,
int requestId, int size)
{
- return getObserver<RemoteMetrics, RemoteObserverI>("Remote",
- new RemoteInvocationHelper(con, endpt, requestId, size));
+ Ice.Instrumentation.RemoteObserver del = null;
+ if(delegate_ != null)
+ {
+ del = delegate_.getRemoteObserver(con, endpt, requestId, size);
+ }
+ return getObserver<RemoteMetrics, RemoteObserverI,
+ Ice.Instrumentation.RemoteObserver>("Remote",
+ new RemoteInvocationHelper(con, endpt, requestId, size),
+ del);
}
private void incrementRetry(InvocationMetrics v)
@@ -740,13 +874,18 @@ namespace IceInternal
}
}
- public class ThreadObserverI : Observer<ThreadMetrics>, Ice.Instrumentation.ThreadObserver
+ public class ThreadObserverI : ObserverWithDelegate<ThreadMetrics, Ice.Instrumentation.ThreadObserver>,
+ Ice.Instrumentation.ThreadObserver
{
public void stateChanged(Ice.Instrumentation.ThreadState oldState, Ice.Instrumentation.ThreadState newState)
{
_oldState = oldState;
_newState = newState;
forEach(threadStateUpdate);
+ if(delegate_ != null)
+ {
+ delegate_.stateChanged(oldState, newState);
+ }
}
private void threadStateUpdate(ThreadMetrics v)
@@ -787,15 +926,27 @@ namespace IceInternal
public class CommunicatorObserverI : Ice.Instrumentation.CommunicatorObserver
{
- public CommunicatorObserverI(IceInternal.MetricsAdminI metrics)
+ public CommunicatorObserverI(IceInternal.MetricsAdminI metrics) : this(metrics, null)
+ {
+ }
+
+ public CommunicatorObserverI(IceInternal.MetricsAdminI metrics,
+ Ice.Instrumentation.CommunicatorObserver del)
{
_metrics = metrics;
- _connections = new ObserverFactory<ConnectionMetrics, ConnectionObserverI>(metrics, "Connection");
- _dispatch = new ObserverFactory<DispatchMetrics, DispatchObserverI>(metrics, "Dispatch");
- _invocations = new ObserverFactory<InvocationMetrics, InvocationObserverI>(metrics, "Invocation");
- _threads = new ObserverFactory<ThreadMetrics, ThreadObserverI>(metrics, "Thread");
- _connects = new ObserverFactory<Metrics, ObserverI>(metrics, "ConnectionEstablishment");
- _endpointLookups = new ObserverFactory<Metrics, ObserverI>(metrics, "EndpointLookup");
+ _delegate = del;
+ _connections = new ObserverFactoryWithDelegate<ConnectionMetrics, ConnectionObserverI,
+ Ice.Instrumentation.ConnectionObserver>(metrics, "Connection");
+ _dispatch = new ObserverFactoryWithDelegate<DispatchMetrics, DispatchObserverI,
+ Ice.Instrumentation.DispatchObserver>(metrics, "Dispatch");
+ _invocations = new ObserverFactoryWithDelegate<InvocationMetrics, InvocationObserverI,
+ Ice.Instrumentation.InvocationObserver>(metrics, "Invocation");
+ _threads = new ObserverFactoryWithDelegate<ThreadMetrics, ThreadObserverI,
+ Ice.Instrumentation.ThreadObserver>(metrics, "Thread");
+ _connects = new ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI,
+ Ice.Instrumentation.Observer>(metrics, "ConnectionEstablishment");
+ _endpointLookups = new ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI,
+ Ice.Instrumentation.Observer>(metrics, "EndpointLookup");
try
{
@@ -811,7 +962,19 @@ namespace IceInternal
{
if(_connects.isEnabled())
{
- return _connects.getObserver(new EndpointHelper(endpt, connector));
+ try
+ {
+ Ice.Instrumentation.Observer del = null;
+ if(_delegate != null)
+ {
+ del = _delegate.getConnectionEstablishmentObserver(endpt, connector);
+ }
+ return _connects.getObserver(new EndpointHelper(endpt, connector), del);
+ }
+ catch(Exception ex)
+ {
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ }
}
return null;
}
@@ -820,29 +983,68 @@ namespace IceInternal
{
if(_endpointLookups.isEnabled())
{
- return _endpointLookups.getObserver(new EndpointHelper(endpt));
+ try
+ {
+ Ice.Instrumentation.Observer del = null;
+ if(_delegate != null)
+ {
+ del = _delegate.getEndpointLookupObserver(endpt);
+ }
+ return _endpointLookups.getObserver(new EndpointHelper(endpt), del);
+ }
+ catch(Exception ex)
+ {
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ }
}
return null;
}
- public Ice.Instrumentation.ConnectionObserver getConnectionObserver(Ice.ConnectionInfo c, Ice.Endpoint e,
+ public Ice.Instrumentation.ConnectionObserver getConnectionObserver(Ice.ConnectionInfo c,
+ Ice.Endpoint e,
Ice.Instrumentation.ConnectionState s,
- Ice.Instrumentation.ConnectionObserver o)
+ Ice.Instrumentation.ConnectionObserver obsv)
{
if(_connections.isEnabled())
{
- return _connections.getObserver(new ConnectionHelper(c, e, s), o);
+ try
+ {
+ Ice.Instrumentation.ConnectionObserver del = null;
+ ConnectionObserverI o = obsv is ConnectionObserverI ? (ConnectionObserverI)obsv : null;
+ if(_delegate != null)
+ {
+ del = _delegate.getConnectionObserver(c, e, s, o != null ? o.getDelegate() : obsv);
+ }
+ return _connections.getObserver(new ConnectionHelper(c, e, s), obsv, del);
+ }
+ catch(Exception ex)
+ {
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ }
}
return null;
}
public Ice.Instrumentation.ThreadObserver getThreadObserver(string parent, string id,
Ice.Instrumentation.ThreadState s,
- Ice.Instrumentation.ThreadObserver o)
+ Ice.Instrumentation.ThreadObserver obsv)
{
if(_threads.isEnabled())
{
- return _threads.getObserver(new ThreadHelper(parent, id, s), o);
+ try
+ {
+ Ice.Instrumentation.ThreadObserver del = null;
+ ThreadObserverI o = obsv is ThreadObserverI ? (ThreadObserverI)obsv : null;
+ if(_delegate != null)
+ {
+ del = _delegate.getThreadObserver(parent, id, s, o != null ? o.getDelegate() : obsv);
+ }
+ return _threads.getObserver(new ThreadHelper(parent, id, s), obsv, del);
+ }
+ catch(Exception ex)
+ {
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ }
}
return null;
}
@@ -852,7 +1054,19 @@ namespace IceInternal
{
if(_invocations.isEnabled())
{
- return _invocations.getObserver(new InvocationHelper(prx, operation, ctx));
+ try
+ {
+ Ice.Instrumentation.InvocationObserver del = null;
+ if(_delegate != null)
+ {
+ del = _delegate.getInvocationObserver(prx, operation, ctx);
+ }
+ return _invocations.getObserver(new InvocationHelper(prx, operation, ctx), del);
+ }
+ catch(Exception ex)
+ {
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ }
}
return null;
}
@@ -861,7 +1075,19 @@ namespace IceInternal
{
if(_dispatch.isEnabled())
{
- return _dispatch.getObserver(new DispatchHelper(c, size));
+ try
+ {
+ Ice.Instrumentation.DispatchObserver del = null;
+ if(_delegate != null)
+ {
+ del = _delegate.getDispatchObserver(c, size);
+ }
+ return _dispatch.getObserver(new DispatchHelper(c, size), del);
+ }
+ catch(Exception ex)
+ {
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ }
}
return null;
}
@@ -870,6 +1096,10 @@ namespace IceInternal
{
_connections.setUpdater(updater.updateConnectionObservers);
_threads.setUpdater(updater.updateThreadObservers);
+ if(_delegate != null)
+ {
+ _delegate.setObserverUpdater(updater);
+ }
}
public IceInternal.MetricsAdminI getMetricsAdmin()
@@ -878,11 +1108,18 @@ namespace IceInternal
}
readonly private IceInternal.MetricsAdminI _metrics;
- readonly private ObserverFactory<ConnectionMetrics, ConnectionObserverI> _connections;
- readonly private ObserverFactory<DispatchMetrics, DispatchObserverI> _dispatch;
- readonly private ObserverFactory<InvocationMetrics, InvocationObserverI> _invocations;
- readonly private ObserverFactory<ThreadMetrics, ThreadObserverI> _threads;
- readonly private ObserverFactory<Metrics, ObserverI> _connects;
- readonly private ObserverFactory<Metrics, ObserverI> _endpointLookups;
+ readonly private Ice.Instrumentation.CommunicatorObserver _delegate;
+ readonly private ObserverFactoryWithDelegate<ConnectionMetrics, ConnectionObserverI,
+ Ice.Instrumentation.ConnectionObserver> _connections;
+ readonly private ObserverFactoryWithDelegate<DispatchMetrics, DispatchObserverI,
+ Ice.Instrumentation.DispatchObserver> _dispatch;
+ readonly private ObserverFactoryWithDelegate<InvocationMetrics, InvocationObserverI,
+ Ice.Instrumentation.InvocationObserver> _invocations;
+ readonly private ObserverFactoryWithDelegate<ThreadMetrics, ThreadObserverI,
+ Ice.Instrumentation.ThreadObserver> _threads;
+ readonly private ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI,
+ Ice.Instrumentation.Observer> _connects;
+ readonly private ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI,
+ Ice.Instrumentation.Observer> _endpointLookups;
}
} \ No newline at end of file
diff --git a/cs/src/Ice/MetricsObserverI.cs b/cs/src/Ice/MetricsObserverI.cs
index ee4d70a81c0..d92ff87ccd5 100644
--- a/cs/src/Ice/MetricsObserverI.cs
+++ b/cs/src/Ice/MetricsObserverI.cs
@@ -214,12 +214,12 @@ namespace IceMX
{
public delegate void MetricsUpdate(T m);
- public void attach()
+ virtual public void attach()
{
Start();
}
- public void detach()
+ virtual public void detach()
{
Stop();
long lifetime = _previousDelay + (long)(ElapsedTicks / (Frequency / 1000000.0));
@@ -229,7 +229,7 @@ namespace IceMX
}
}
- public void failed(string exceptionName)
+ virtual public void failed(string exceptionName)
{
foreach(MetricsMap<T>.Entry e in _objects)
{
@@ -349,7 +349,14 @@ namespace IceMX
lock(this)
{
List<MetricsMap<T>.Entry> metricsObjects = null;
- O old = (O)observer;
+ O old = null;
+ try
+ {
+ old = (O)observer;
+ }
+ catch(InvalidCastException)
+ {
+ }
foreach(MetricsMap<T> m in _maps)
{
MetricsMap<T>.Entry e = m.getMatching(helper, old != null ? old.getEntry(m) : null);
diff --git a/cs/src/Ice/ObserverHelper.cs b/cs/src/Ice/ObserverHelper.cs
index 677350499af..dafa0188c60 100644
--- a/cs/src/Ice/ObserverHelper.cs
+++ b/cs/src/Ice/ObserverHelper.cs
@@ -16,7 +16,7 @@ namespace IceInternal
{
static public InvocationObserver get(Instance instance, string op)
{
- CommunicatorObserver obsv = instance.initializationData().observer;
+ CommunicatorObserver obsv = instance.getObserver();
if(obsv != null)
{
InvocationObserver observer = obsv.getInvocationObserver(null, op, _emptyContext);
@@ -36,8 +36,7 @@ namespace IceInternal
static public InvocationObserver get(Ice.ObjectPrx proxy, string op, Dictionary<string, string> context)
{
- CommunicatorObserver obsv =
- ((Ice.ObjectPrxHelperBase)proxy).reference__().getInstance().initializationData().observer;
+ CommunicatorObserver obsv = ((Ice.ObjectPrxHelperBase)proxy).reference__().getInstance().getObserver();
if(obsv != null)
{
InvocationObserver observer;
diff --git a/cs/src/Ice/ThreadPool.cs b/cs/src/Ice/ThreadPool.cs
index 582e5d1ed17..4a00255cdfd 100644
--- a/cs/src/Ice/ThreadPool.cs
+++ b/cs/src/Ice/ThreadPool.cs
@@ -761,7 +761,7 @@ namespace IceInternal
public void updateObserver()
{
// Must be called with the thread pool mutex locked
- Ice.Instrumentation.CommunicatorObserver obsv = _threadPool._instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _threadPool._instance.getObserver();
if(obsv != null)
{
_observer = obsv.getThreadObserver(_threadPool._prefix, _name, _state, _observer);
diff --git a/cs/test/Ice/metrics/AllTests.cs b/cs/test/Ice/metrics/AllTests.cs
index e091fca7f35..99c8d9353bd 100644
--- a/cs/test/Ice/metrics/AllTests.cs
+++ b/cs/test/Ice/metrics/AllTests.cs
@@ -377,9 +377,9 @@ public class AllTests : TestCommon.TestApp
}
#if SILVERLIGHT
- override public void run(Ice.Communicator communicator)
+ override public void run(Ice.Communicator communicator, CommunicatorObserverI obsv)
#else
- public static MetricsPrx allTests(Ice.Communicator communicator)
+ public static MetricsPrx allTests(Ice.Communicator communicator, CommunicatorObserverI obsv)
#endif
{
MetricsPrx metrics = MetricsPrxHelper.checkedCast(communicator.stringToProxy("metrics:default -p 12010"));
@@ -1057,6 +1057,41 @@ public class AllTests : TestCommon.TestApp
WriteLine("ok");
+ Write("testing instrumentation observer delegate... ");
+ Flush();
+
+ test(obsv.threadObserver.total > 0);
+ test(obsv.connectionObserver.total > 0);
+ test(obsv.connectionEstablishmentObserver.total > 0);
+ test(obsv.endpointLookupObserver.total > 0);
+ test(obsv.dispatchObserver.total > 0);
+ test(obsv.invocationObserver.total > 0);
+ test(obsv.invocationObserver.remoteObserver.total > 0);
+
+ test(obsv.threadObserver.current > 0);
+ test(obsv.connectionObserver.current > 0);
+ test(obsv.connectionEstablishmentObserver.current == 0);
+ test(obsv.endpointLookupObserver.current == 0);
+ test(obsv.dispatchObserver.current == 0);
+ test(obsv.invocationObserver.current == 0);
+ test(obsv.invocationObserver.remoteObserver.current == 0);
+
+ test(obsv.threadObserver.failedCount == 0);
+ test(obsv.connectionObserver.failedCount > 0);
+ test(obsv.connectionEstablishmentObserver.failedCount > 0);
+ test(obsv.endpointLookupObserver.failedCount > 0);
+ //test(obsv.dispatchObserver.failedCount > 0);
+ test(obsv.invocationObserver.failedCount > 0);
+ test(obsv.invocationObserver.remoteObserver.failedCount > 0);
+
+ test(obsv.threadObserver.states > 0);
+ test(obsv.connectionObserver.received > 0 && obsv.connectionObserver.sent > 0);
+ //test(obsv.dispatchObserver.userExceptionCount > 0);
+ test(obsv.invocationObserver.userExceptionCount > 0 && obsv.invocationObserver.retriedCount > 0);
+ test(obsv.invocationObserver.remoteObserver.replySize > 0);
+
+ WriteLine("ok");
+
#if SILVERLIGHT
metrics.shutdown();
#else
diff --git a/cs/test/Ice/metrics/Client.cs b/cs/test/Ice/metrics/Client.cs
index 566db6a836d..657b21adc94 100644
--- a/cs/test/Ice/metrics/Client.cs
+++ b/cs/test/Ice/metrics/Client.cs
@@ -21,7 +21,7 @@ public class Client
{
private static int run(string[] args, Ice.Communicator communicator)
{
- Test.MetricsPrx metrics = AllTests.allTests(communicator);
+ Test.MetricsPrx metrics = AllTests.allTests(communicator, _observer);
metrics.shutdown();
return 0;
}
@@ -52,6 +52,7 @@ public class Client
//
initData.properties.setProperty("Ice.FactoryAssemblies", "client");
#endif
+ initData.observer = _observer;
communicator = Ice.Util.initialize(ref args, initData);
status = run(args, communicator);
}
@@ -76,4 +77,6 @@ public class Client
return status;
}
+
+ static CommunicatorObserverI _observer = new CommunicatorObserverI();
}
diff --git a/cs/test/Ice/metrics/InstrumentationI.cs b/cs/test/Ice/metrics/InstrumentationI.cs
new file mode 100644
index 00000000000..f6811e37fc7
--- /dev/null
+++ b/cs/test/Ice/metrics/InstrumentationI.cs
@@ -0,0 +1,376 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+
+using Test;
+
+public class ObserverI : Ice.Instrumentation.Observer
+{
+ virtual public void
+ reset()
+ {
+ lock(this)
+ {
+ total = 0;
+ current = 0;
+ failedCount = 0;
+ }
+ }
+
+ public void
+ attach()
+ {
+ lock(this)
+ {
+ ++total;
+ ++current;
+ }
+ }
+ public void
+ detach()
+ {
+ lock(this)
+ {
+ --current;
+ }
+ }
+ public void
+ failed(String s)
+ {
+ lock(this)
+ {
+ ++failedCount;
+ }
+ }
+
+ public int total;
+ public int current;
+ public int failedCount;
+};
+
+public class RemoteObserverI : ObserverI, Ice.Instrumentation.RemoteObserver
+{
+ override public void
+ reset()
+ {
+ lock(this)
+ {
+ base.reset();
+ replySize = 0;
+ }
+ }
+
+ public void
+ reply(int s)
+ {
+ lock(this)
+ {
+ replySize += s;
+ }
+ }
+
+ public int replySize;
+};
+
+public class InvocationObserverI : ObserverI , Ice.Instrumentation.InvocationObserver
+{
+ override public void
+ reset()
+ {
+ lock(this)
+ {
+ base.reset();
+ retriedCount = 0;
+ userExceptionCount = 0;
+ if(remoteObserver != null)
+ {
+ remoteObserver.reset();
+ }
+ }
+ }
+
+ public void
+ retried()
+ {
+ lock(this)
+ {
+ ++retriedCount;
+ }
+ }
+
+ public void
+ userException()
+ {
+ lock(this)
+ {
+ ++userExceptionCount;
+ }
+ }
+
+ public Ice.Instrumentation.RemoteObserver
+ getRemoteObserver(Ice.ConnectionInfo c, Ice.Endpoint e, int a, int b)
+ {
+ lock(this)
+ {
+ if(remoteObserver == null)
+ {
+ remoteObserver = new RemoteObserverI();
+ remoteObserver.reset();
+ }
+ return remoteObserver;
+ }
+ }
+
+ public int userExceptionCount;
+ public int retriedCount;
+
+ public RemoteObserverI remoteObserver = null;
+};
+
+public class DispatchObserverI : ObserverI , Ice.Instrumentation.DispatchObserver
+{
+ override public void
+ reset()
+ {
+ lock(this)
+ {
+ base.reset();
+ userExceptionCount = 0;
+ replySize = 0;
+ }
+ }
+
+ public void
+ userException()
+ {
+ lock(this)
+ {
+ ++userExceptionCount;
+ }
+ }
+
+ public void
+ reply(int s)
+ {
+ lock(this)
+ {
+ replySize += s;
+ }
+ }
+
+ public int userExceptionCount;
+ public int replySize;
+};
+
+public class ConnectionObserverI : ObserverI , Ice.Instrumentation.ConnectionObserver
+{
+ override public void
+ reset()
+ {
+ lock(this)
+ {
+ base.reset();
+ received = 0;
+ sent = 0;
+ }
+ }
+
+ public void
+ sentBytes(int s)
+ {
+ lock(this)
+ {
+ sent += s;
+ }
+ }
+
+ public void
+ receivedBytes(int s)
+ {
+ lock(this)
+ {
+ received += s;
+ }
+ }
+
+ public int sent;
+ public int received;
+};
+
+public class ThreadObserverI : ObserverI , Ice.Instrumentation.ThreadObserver
+{
+ override public void
+ reset()
+ {
+ lock(this)
+ {
+ base.reset();
+ states = 0;
+ }
+ }
+
+ public void
+ stateChanged(Ice.Instrumentation.ThreadState o, Ice.Instrumentation.ThreadState n)
+ {
+ lock(this)
+ {
+ ++states;
+ }
+ }
+
+ public int states;
+};
+
+public class CommunicatorObserverI : Ice.Instrumentation.CommunicatorObserver
+{
+ public void
+ setObserverUpdater(Ice.Instrumentation.ObserverUpdater u)
+ {
+ lock(this)
+ {
+ updater = u;
+ }
+ }
+
+ public Ice.Instrumentation.Observer
+ getConnectionEstablishmentObserver(Ice.Endpoint e, String s)
+ {
+ lock(this)
+ {
+ if(connectionEstablishmentObserver == null)
+ {
+ connectionEstablishmentObserver = new ObserverI();
+ connectionEstablishmentObserver.reset();
+ }
+ return connectionEstablishmentObserver;
+ }
+ }
+
+
+ public Ice.Instrumentation.Observer
+ getEndpointLookupObserver(Ice.Endpoint e)
+ {
+ lock(this)
+ {
+ if(endpointLookupObserver == null)
+ {
+ endpointLookupObserver = new ObserverI();
+ endpointLookupObserver.reset();
+ }
+ return endpointLookupObserver;
+ }
+ }
+
+ public Ice.Instrumentation.ConnectionObserver
+ getConnectionObserver(Ice.ConnectionInfo c,
+ Ice.Endpoint e,
+ Ice.Instrumentation.ConnectionState s,
+ Ice.Instrumentation.ConnectionObserver old)
+ {
+ lock(this)
+ {
+ Debug.Assert(old == null || old is ConnectionObserverI);
+ if(connectionObserver == null)
+ {
+ connectionObserver = new ConnectionObserverI();
+ connectionObserver.reset();
+ }
+ return connectionObserver;
+ }
+ }
+
+ public Ice.Instrumentation.ThreadObserver
+ getThreadObserver(String p, String id, Ice.Instrumentation.ThreadState s,
+ Ice.Instrumentation.ThreadObserver old)
+ {
+ lock(this)
+ {
+ Debug.Assert(old == null || old is ThreadObserverI);
+ if(threadObserver == null)
+ {
+ threadObserver = new ThreadObserverI();
+ threadObserver.reset();
+ }
+ return threadObserver;
+ }
+ }
+
+ public Ice.Instrumentation.InvocationObserver
+ getInvocationObserver(Ice.ObjectPrx p, String op, Dictionary<String, String> ctx)
+ {
+ lock(this)
+ {
+ if(invocationObserver == null)
+ {
+ invocationObserver = new InvocationObserverI();
+ invocationObserver.reset();
+ }
+ return invocationObserver;
+ }
+ }
+
+ public Ice.Instrumentation.DispatchObserver
+ getDispatchObserver(Ice.Current current, int s)
+ {
+ lock(this)
+ {
+ if(dispatchObserver == null)
+ {
+ dispatchObserver = new DispatchObserverI();
+ dispatchObserver.reset();
+ }
+ return dispatchObserver;
+ }
+ }
+
+ void
+ reset()
+ {
+ lock(this)
+ {
+ if(connectionEstablishmentObserver != null)
+ {
+ connectionEstablishmentObserver.reset();
+ }
+ if(endpointLookupObserver != null)
+ {
+ endpointLookupObserver.reset();
+ }
+ if(connectionObserver != null)
+ {
+ connectionObserver.reset();
+ }
+ if(threadObserver != null)
+ {
+ threadObserver.reset();
+ }
+ if(invocationObserver != null)
+ {
+ invocationObserver.reset();
+ }
+ if(dispatchObserver != null)
+ {
+ dispatchObserver.reset();
+ }
+ }
+ }
+
+ protected Ice.Instrumentation.ObserverUpdater updater;
+
+ public ObserverI connectionEstablishmentObserver;
+ public ObserverI endpointLookupObserver;
+ public ConnectionObserverI connectionObserver;
+ public ThreadObserverI threadObserver;
+ public InvocationObserverI invocationObserver;
+ public DispatchObserverI dispatchObserver;
+};
+
diff --git a/cs/test/Ice/metrics/Makefile b/cs/test/Ice/metrics/Makefile
index 9fc55e3d052..5d5b2abf1bc 100644
--- a/cs/test/Ice/metrics/Makefile
+++ b/cs/test/Ice/metrics/Makefile
@@ -11,7 +11,7 @@ top_srcdir = ../../..
TARGETS = client.exe server.exe serveramd.exe
-C_SRCS = AllTests.cs Client.cs ../../TestCommon/TestApp.cs
+C_SRCS = AllTests.cs Client.cs InstrumentationI.cs ../../TestCommon/TestApp.cs
S_SRCS = MetricsI.cs Server.cs
SAMD_SRCS = MetricsAMDI.cs Server.cs
diff --git a/cs/test/Ice/metrics/Makefile.mak b/cs/test/Ice/metrics/Makefile.mak
index d50449baffc..c488a767153 100644
--- a/cs/test/Ice/metrics/Makefile.mak
+++ b/cs/test/Ice/metrics/Makefile.mak
@@ -11,7 +11,7 @@ top_srcdir = ..\..\..
TARGETS = client.exe server.exe serveramd.exe
-C_SRCS = AllTests.cs Client.cs ..\..\TestCommon\TestApp.cs
+C_SRCS = AllTests.cs Client.cs InstrumentationI.cs ..\..\TestCommon\TestApp.cs
S_SRCS = MetricsI.cs Server.cs
SAMD_SRCS = MetricsAMDI.cs Server.cs
diff --git a/distribution/lib/DistUtils.py b/distribution/lib/DistUtils.py
index baaca0d5f49..82916337630 100644
--- a/distribution/lib/DistUtils.py
+++ b/distribution/lib/DistUtils.py
@@ -435,7 +435,7 @@ def fixPermission(dest):
def tarArchive(dir, verbose = False, archiveDir = None):
dist = os.path.basename(dir)
- sys.stdout.write(" creating " + dist + ".tar.gz ...")
+ sys.stdout.write(" creating " + dist + ".tar.gz... ")
sys.stdout.flush()
cwd = os.getcwd()
@@ -486,7 +486,7 @@ def untarArchive(archive, verbose = False, archiveDir = None):
def zipArchive(dir, verbose = False, archiveDir = None):
dist = os.path.basename(dir)
- sys.stdout.write(" creating " + dist + ".zip ...")
+ sys.stdout.write(" creating " + dist + ".zip... ")
sys.stdout.flush()
cwd = os.getcwd()
diff --git a/distribution/makedist.py b/distribution/makedist.py
index f473cc8765e..c60295cf544 100755
--- a/distribution/makedist.py
+++ b/distribution/makedist.py
@@ -15,10 +15,53 @@ from DistUtils import *
import FixUtil
#
+# Windows files that will be excluded from Unix source distributions.
+#
+excludeWindowsFiles = [ \
+ "/vsaddin/",
+ "*.rc",
+ "*.sln",
+ "*.csproj",
+ "*.vbproj",
+ "*.vcproj",
+ "*.vcxproj",
+ "*.vcxproj.filters",
+ "Make*mak*",
+ "Make.rules.msvc",
+ ".depend.mak",
+ "*.exe.config",
+ "MSG00001.bin",
+ "/cpp/test/WinRT",
+ "/cpp/demo/**/generated",
+ "/cpp/demo/**/MFC",
+ "/cpp/**/winrt"
+ "/cs/**/sl",
+ "/cs/**/compact",
+ "/cs/**/cf",
+]
+
+#
+# Unix files that will be excluded from Windows source distributions.
+#
+excludeUnixFiles = [ \
+]
+for l in ["/java", "/py", "/php", "/cs", "/cpp/demo"]:
+ excludeUnixFiles += [
+ l + "/**/Makefile",
+ l + "/**/Make.rules.",
+ l + "/**/Make.rules.cs",
+ l + "/**/Make.rules.php",
+ l + "/**/Make.rules",
+ l + "/**/Make.rules.Darwin",
+ l + "/**/Make.rules.Linux",
+ l + "/**/.depend"
+ ]
+
+#
# Files from the top-level, cpp, java and cs config directories to include in the demo
# source distribution config directory.
#
-configFiles = [ \
+demoConfigFiles = [ \
"Make.*", \
"common.xml", \
]
@@ -27,7 +70,7 @@ configFiles = [ \
# Files from the top-level certs directory to include in the demo distribution certs
# directory.
#
-certsFiles = [ \
+demoCertsFiles = [ \
"*.jks", \
"*.pem", \
"*.pfx", \
@@ -563,9 +606,9 @@ sys.stdout.flush()
copy("ICE_LICENSE", demoDir)
copy(os.path.join(distFilesDir, "src", "common", "README.DEMOS"), demoDir)
-copyMatchingFiles(os.path.join("certs"), os.path.join(demoDir, "certs"), certsFiles)
+copyMatchingFiles(os.path.join("certs"), os.path.join(demoDir, "certs"), demoCertsFiles)
for d in ["", "cpp", "java", "cs"]:
- copyMatchingFiles(os.path.join(d, "config"), os.path.join(demoDir, "config"), configFiles)
+ copyMatchingFiles(os.path.join(d, "config"), os.path.join(demoDir, "config"), demoConfigFiles)
copy(os.path.join(distFilesDir, "src", "common", "Make.rules"), os.path.join(demoDir, "config"), False)
copy(os.path.join(distFilesDir, "src", "common", "Make.rules.cs"), os.path.join(demoDir, "config"), False)
@@ -599,7 +642,7 @@ remove(os.path.join(srcDir, 'vb'))
# Windows demo distribution
copy(os.path.join(winDistFilesDir, "src", "common", "README.DEMOS.txt"), os.path.join(winDemoDir, "README.txt"))
-copyMatchingFiles(os.path.join(winSrcDir, "certs"), os.path.join(winDemoDir, "certs"), certsFiles)
+copyMatchingFiles(os.path.join(winSrcDir, "certs"), os.path.join(winDemoDir, "certs"), demoCertsFiles)
os.mkdir(os.path.join(winDemoDir, "config"))
diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java
index 82fd56d46a6..21b6525a05a 100644
--- a/java/src/Ice/CommunicatorI.java
+++ b/java/src/Ice/CommunicatorI.java
@@ -140,9 +140,10 @@ public final class CommunicatorI implements Communicator
return _instance.initializationData().stats;
}
- public Ice.Instrumentation.CommunicatorObserver getObserver()
+ public Ice.Instrumentation.CommunicatorObserver
+ getObserver()
{
- return _instance.initializationData().observer;
+ return _instance.getObserver();
}
public RouterPrx
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 32bf6308d05..bdd49e86ab7 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -257,11 +257,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return;
}
- assert(_instance.initializationData().observer != null);
- _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
- _endpoint,
- toConnectionState(_state),
- _observer);
+ assert(_instance.getObserver() != null);
+ _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ toConnectionState(_state),
+ _observer);
if(_observer != null)
{
_observer.attach();
@@ -1831,16 +1831,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- if(_instance.initializationData().observer != null)
+ if(_instance.getObserver() != null)
{
Ice.Instrumentation.ConnectionState oldState = toConnectionState(_state);
Ice.Instrumentation.ConnectionState newState = toConnectionState(state);
if(oldState != newState)
{
- _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(),
- _endpoint,
- newState,
- _observer);
+ _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ newState,
+ _observer);
if(_observer != null)
{
_observer.attach();
diff --git a/java/src/IceInternal/CommunicatorObserverI.java b/java/src/IceInternal/CommunicatorObserverI.java
index 056ef2cd007..78c44da0905 100644
--- a/java/src/IceInternal/CommunicatorObserverI.java
+++ b/java/src/IceInternal/CommunicatorObserverI.java
@@ -583,16 +583,27 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb
public
CommunicatorObserverI(IceInternal.MetricsAdminI metrics)
{
- _metrics = metrics;
+ this(metrics, null);
+ }
- _connections = new ObserverFactory<ConnectionMetrics, ConnectionObserverI>(metrics, "Connection",
- ConnectionMetrics.class);
- _dispatch = new ObserverFactory<DispatchMetrics, DispatchObserverI>(metrics, "Dispatch", DispatchMetrics.class);
- _invocations = new ObserverFactory<InvocationMetrics, InvocationObserverI>(metrics, "Invocation",
- InvocationMetrics.class);
- _threads = new ObserverFactory<ThreadMetrics, ThreadObserverI>(metrics, "Thread", ThreadMetrics.class);
- _connects = new ObserverFactory<Metrics, ObserverI>(metrics, "ConnectionEstablishment", Metrics.class);
- _endpointLookups = new ObserverFactory<Metrics, ObserverI>(metrics, "EndpointLookup", Metrics.class);
+ public
+ CommunicatorObserverI(IceInternal.MetricsAdminI metrics, Ice.Instrumentation.CommunicatorObserver delegate)
+ {
+ _metrics = metrics;
+ _delegate = delegate;
+
+ _connections = new ObserverFactoryWithDelegate<ConnectionMetrics, ConnectionObserverI,
+ Ice.Instrumentation.ConnectionObserver>(metrics, "Connection", ConnectionMetrics.class);
+ _dispatch = new ObserverFactoryWithDelegate<DispatchMetrics, DispatchObserverI,
+ Ice.Instrumentation.DispatchObserver>(metrics, "Dispatch", DispatchMetrics.class);
+ _invocations = new ObserverFactoryWithDelegate<InvocationMetrics, InvocationObserverI,
+ Ice.Instrumentation.InvocationObserver>(metrics, "Invocation", InvocationMetrics.class);
+ _threads = new ObserverFactoryWithDelegate<ThreadMetrics, ThreadObserverI,
+ Ice.Instrumentation.ThreadObserver>(metrics, "Thread", ThreadMetrics.class);
+ _connects = new ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI,
+ Ice.Instrumentation.Observer>(metrics, "ConnectionEstablishment", Metrics.class);
+ _endpointLookups = new ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI,
+ Ice.Instrumentation.Observer>(metrics, "EndpointLookup", Metrics.class);
try
{
@@ -612,11 +623,17 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb
{
try
{
- return _connects.getObserver(new EndpointHelper(endpt, connector), ObserverI.class);
+ Ice.Instrumentation.Observer delegate = null;
+ if(_delegate != null)
+ {
+ delegate = _delegate.getConnectionEstablishmentObserver(endpt, connector);
+ }
+ return _connects.getObserver(new EndpointHelper(endpt, connector), ObserverWithDelegateI.class,
+ delegate);
}
catch(Exception ex)
{
- _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex));
}
}
return null;
@@ -629,11 +646,16 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb
{
try
{
- return _endpointLookups.getObserver(new EndpointHelper(endpt), ObserverI.class);
+ Ice.Instrumentation.Observer delegate = null;
+ if(_delegate != null)
+ {
+ delegate = _delegate.getEndpointLookupObserver(endpt);
+ }
+ return _endpointLookups.getObserver(new EndpointHelper(endpt), ObserverWithDelegateI.class, delegate);
}
catch(Exception ex)
{
- _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex));
}
}
@@ -642,34 +664,47 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb
public Ice.Instrumentation.ConnectionObserver
getConnectionObserver(Ice.ConnectionInfo c, Ice.Endpoint e, Ice.Instrumentation.ConnectionState s,
- Ice.Instrumentation.ConnectionObserver o)
+ Ice.Instrumentation.ConnectionObserver observer)
{
if(_connections.isEnabled())
{
try
{
- return _connections.getObserver(new ConnectionHelper(c, e, s), o, ConnectionObserverI.class);
+ Ice.Instrumentation.ConnectionObserver delegate = null;
+ ConnectionObserverI o = observer instanceof ConnectionObserverI ? (ConnectionObserverI)observer : null;
+ if(_delegate != null)
+ {
+ delegate = _delegate.getConnectionObserver(c, e, s, o != null ? o.getDelegate() : observer);
+ }
+ return _connections.getObserver(new ConnectionHelper(c, e, s), o, ConnectionObserverI.class, delegate);
}
catch(Exception ex)
{
- _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex));
}
}
return null;
}
public Ice.Instrumentation.ThreadObserver
- getThreadObserver(String parent, String id, Ice.Instrumentation.ThreadState s, Ice.Instrumentation.ThreadObserver o)
+ getThreadObserver(String parent, String id, Ice.Instrumentation.ThreadState s,
+ Ice.Instrumentation.ThreadObserver observer)
{
if(_threads.isEnabled())
{
try
{
- return _threads.getObserver(new ThreadHelper(parent, id, s), o, ThreadObserverI.class);
+ Ice.Instrumentation.ThreadObserver delegate = null;
+ ThreadObserverI o = observer instanceof ThreadObserverI ? (ThreadObserverI)observer : null;
+ if(_delegate != null)
+ {
+ delegate = _delegate.getThreadObserver(parent, id, s, o != null ? o.getDelegate() : observer);
+ }
+ return _threads.getObserver(new ThreadHelper(parent, id, s), o, ThreadObserverI.class, delegate);
}
catch(Exception ex)
{
- _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex));
}
}
return null;
@@ -682,11 +717,18 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb
{
try
{
- return _invocations.getObserver(new InvocationHelper(prx, operation, ctx), InvocationObserverI.class);
+ Ice.Instrumentation.InvocationObserver delegate = null;
+ if(_delegate != null)
+ {
+ delegate = _delegate.getInvocationObserver(prx, operation, ctx);
+ }
+ return _invocations.getObserver(new InvocationHelper(prx, operation, ctx),
+ InvocationObserverI.class,
+ delegate);
}
catch(Exception ex)
{
- _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex));
}
}
return null;
@@ -699,11 +741,16 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb
{
try
{
- return _dispatch.getObserver(new DispatchHelper(c, size), DispatchObserverI.class);
+ Ice.Instrumentation.DispatchObserver delegate = null;
+ if(_delegate != null)
+ {
+ delegate = _delegate.getDispatchObserver(c, size);
+ }
+ return _dispatch.getObserver(new DispatchHelper(c, size), DispatchObserverI.class, delegate);
}
catch(Exception ex)
{
- _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex);
+ _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex));
}
}
return null;
@@ -726,6 +773,11 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb
updater.updateThreadObservers();
}
});
+
+ if(_delegate != null)
+ {
+ _delegate.setObserverUpdater(updater);
+ }
}
public IceInternal.MetricsAdminI getMetricsAdmin()
@@ -734,10 +786,17 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb
}
final private IceInternal.MetricsAdminI _metrics;
- final private ObserverFactory<ConnectionMetrics, ConnectionObserverI> _connections;
- final private ObserverFactory<DispatchMetrics, DispatchObserverI> _dispatch;
- final private ObserverFactory<InvocationMetrics, InvocationObserverI> _invocations;
- final private ObserverFactory<ThreadMetrics, ThreadObserverI> _threads;
- final private ObserverFactory<Metrics, ObserverI> _connects;
- final private ObserverFactory<Metrics, ObserverI> _endpointLookups;
+ final private Ice.Instrumentation.CommunicatorObserver _delegate;
+ final private ObserverFactoryWithDelegate<ConnectionMetrics, ConnectionObserverI,
+ Ice.Instrumentation.ConnectionObserver> _connections;
+ final private ObserverFactoryWithDelegate<DispatchMetrics, DispatchObserverI,
+ Ice.Instrumentation.DispatchObserver> _dispatch;
+ final private ObserverFactoryWithDelegate<InvocationMetrics, InvocationObserverI,
+ Ice.Instrumentation.InvocationObserver> _invocations;
+ final private ObserverFactoryWithDelegate<ThreadMetrics, ThreadObserverI,
+ Ice.Instrumentation.ThreadObserver> _threads;
+ final private ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI,
+ Ice.Instrumentation.Observer> _connects;
+ final private ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI,
+ Ice.Instrumentation.Observer> _endpointLookups;
}
diff --git a/java/src/IceInternal/ConnectionObserverI.java b/java/src/IceInternal/ConnectionObserverI.java
index 847b0216ec8..9aa68be1280 100644
--- a/java/src/IceInternal/ConnectionObserverI.java
+++ b/java/src/IceInternal/ConnectionObserverI.java
@@ -9,7 +9,8 @@
package IceInternal;
-public class ConnectionObserverI extends IceMX.Observer<IceMX.ConnectionMetrics>
+public class ConnectionObserverI
+ extends IceMX.ObserverWithDelegate<IceMX.ConnectionMetrics, Ice.Instrumentation.ConnectionObserver>
implements Ice.Instrumentation.ConnectionObserver
{
public void
@@ -17,6 +18,10 @@ public class ConnectionObserverI extends IceMX.Observer<IceMX.ConnectionMetrics>
{
_sentBytes = num;
forEach(_sentBytesUpdate);
+ if(_delegate != null)
+ {
+ _delegate.sentBytes(num);
+ }
}
public void
@@ -24,6 +29,10 @@ public class ConnectionObserverI extends IceMX.Observer<IceMX.ConnectionMetrics>
{
_receivedBytes = num;
forEach(_receivedBytesUpdate);
+ if(_delegate != null)
+ {
+ _delegate.receivedBytes(num);
+ }
}
private MetricsUpdate<IceMX.ConnectionMetrics> _sentBytesUpdate = new MetricsUpdate<IceMX.ConnectionMetrics>()
diff --git a/java/src/IceInternal/DispatchObserverI.java b/java/src/IceInternal/DispatchObserverI.java
index 97dadc19d0d..c5366c4ac69 100644
--- a/java/src/IceInternal/DispatchObserverI.java
+++ b/java/src/IceInternal/DispatchObserverI.java
@@ -9,13 +9,18 @@
package IceInternal;
-public class DispatchObserverI extends IceMX.Observer<IceMX.DispatchMetrics>
+public class DispatchObserverI
+ extends IceMX.ObserverWithDelegate<IceMX.DispatchMetrics, Ice.Instrumentation.DispatchObserver>
implements Ice.Instrumentation.DispatchObserver
{
public void
userException()
{
forEach(_userException);
+ if(_delegate != null)
+ {
+ _delegate.userException();
+ }
}
public void
@@ -29,6 +34,10 @@ public class DispatchObserverI extends IceMX.Observer<IceMX.DispatchMetrics>
v.replySize += size;
}
});
+ if(_delegate != null)
+ {
+ _delegate.reply(size);
+ }
}
final MetricsUpdate<IceMX.DispatchMetrics> _userException = new MetricsUpdate<IceMX.DispatchMetrics>()
diff --git a/java/src/IceInternal/EndpointHostResolver.java b/java/src/IceInternal/EndpointHostResolver.java
index 25f144c66a5..57ed431299d 100644
--- a/java/src/IceInternal/EndpointHostResolver.java
+++ b/java/src/IceInternal/EndpointHostResolver.java
@@ -52,7 +52,7 @@ public class EndpointHostResolver
}
}
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
Ice.Instrumentation.Observer observer = null;
if(obsv != null)
{
@@ -110,7 +110,7 @@ public class EndpointHostResolver
entry.endpoint = endpoint;
entry.callback = callback;
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
if(obsv != null)
{
entry.observer = obsv.getEndpointLookupObserver(endpoint);
@@ -240,7 +240,7 @@ public class EndpointHostResolver
synchronized public void
updateObserver()
{
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
if(obsv != null)
{
_observer = obsv.getThreadObserver("Communicator",
diff --git a/java/src/IceInternal/Incoming.java b/java/src/IceInternal/Incoming.java
index cccdfc43a33..5b76d223f8c 100644
--- a/java/src/IceInternal/Incoming.java
+++ b/java/src/IceInternal/Incoming.java
@@ -123,7 +123,7 @@ final public class Incoming extends IncomingBase implements Ice.Request
_current.ctx.put(first, second);
}
- CommunicatorObserver obsv = _instance.initializationData().observer;
+ CommunicatorObserver obsv = _instance.getObserver();
if(obsv != null)
{
// Read the parameter encapsulation size.
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index 03fbc58c434..a837018cd6a 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -531,6 +531,13 @@ public final class Instance
return result;
}
+ public Ice.Instrumentation.CommunicatorObserver
+ getObserver()
+ {
+ return _observer; // Immutable
+ }
+
+
public synchronized void
setDefaultLocator(Ice.LocatorPrx locator)
{
@@ -817,18 +824,20 @@ public final class Instance
// Setup the communicator observer only if the user didn't already set an
// Ice observer resolver and if the admininistrative endpoints are set.
//
- if(_initData.observer == null &&
- (_adminFacetFilter.isEmpty() || _adminFacetFilter.contains("Metrics")) &&
+ if((_adminFacetFilter.isEmpty() || _adminFacetFilter.contains("Metrics")) &&
_initData.properties.getProperty("Ice.Admin.Endpoints").length() > 0)
{
- CommunicatorObserverI observer = new CommunicatorObserverI(admin);
- _initData.observer = observer;
+ _observer = new CommunicatorObserverI(admin, _initData.observer);
//
// Make sure the admin plugin receives property updates.
//
props.addUpdateCallback(admin);
}
+ else
+ {
+ _observer = _initData.observer;
+ }
}
catch(Ice.LocalException ex)
{
@@ -882,9 +891,9 @@ public final class Instance
//
// Set observer updater
//
- if(_initData.observer != null)
+ if(_observer != null)
{
- _initData.observer.setObserverUpdater(new ObserverUpdaterI(this));
+ _observer.setObserverUpdater(new ObserverUpdaterI(this));
}
//
@@ -1235,6 +1244,7 @@ public final class Instance
private final int _clientACM; // Immutable, not reset by destroy().
private final int _serverACM; // Immutable, not reset by destroy().
private final Ice.ImplicitContextI _implicitContext;
+ private final Ice.Instrumentation.CommunicatorObserver _observer;
private RouterManager _routerManager;
private LocatorManager _locatorManager;
private ReferenceFactory _referenceFactory;
diff --git a/java/src/IceInternal/InvocationObserverI.java b/java/src/IceInternal/InvocationObserverI.java
index c36b8a581c3..dd98db9df23 100644
--- a/java/src/IceInternal/InvocationObserverI.java
+++ b/java/src/IceInternal/InvocationObserverI.java
@@ -11,7 +11,8 @@ package IceInternal;
import IceMX.*;
-public class InvocationObserverI extends IceMX.Observer<IceMX.InvocationMetrics>
+public class InvocationObserverI
+ extends IceMX.ObserverWithDelegate<IceMX.InvocationMetrics, Ice.Instrumentation.InvocationObserver>
implements Ice.Instrumentation.InvocationObserver
{
static public final class RemoteInvocationHelper extends MetricsHelper<RemoteMetrics>
@@ -117,21 +118,35 @@ public class InvocationObserverI extends IceMX.Observer<IceMX.InvocationMetrics>
userException()
{
forEach(_userException);
+ if(_delegate != null)
+ {
+ _delegate.userException();
+ }
}
public void
retried()
{
forEach(_incrementRetry);
+ if(_delegate != null)
+ {
+ _delegate.retried();
+ }
}
public Ice.Instrumentation.RemoteObserver
getRemoteObserver(Ice.ConnectionInfo con, Ice.Endpoint edpt, int requestId, int sz)
{
+ Ice.Instrumentation.RemoteObserver delegate = null;
+ if(_delegate != null)
+ {
+ delegate = _delegate.getRemoteObserver(con, edpt, requestId, sz);
+ }
return (Ice.Instrumentation.RemoteObserver)getObserver("Remote",
new RemoteInvocationHelper(con, edpt, requestId, sz),
RemoteMetrics.class,
- RemoteObserverI.class);
+ RemoteObserverI.class,
+ delegate);
}
final MetricsUpdate<InvocationMetrics> _incrementRetry = new MetricsUpdate<InvocationMetrics>()
diff --git a/java/src/IceInternal/ObserverHelper.java b/java/src/IceInternal/ObserverHelper.java
index d2b2be71177..6e1514baa8f 100644
--- a/java/src/IceInternal/ObserverHelper.java
+++ b/java/src/IceInternal/ObserverHelper.java
@@ -17,7 +17,7 @@ public final class ObserverHelper
static public InvocationObserver
get(Instance instance, String op)
{
- CommunicatorObserver obsv = instance.initializationData().observer;
+ CommunicatorObserver obsv = instance.getObserver();
if(obsv != null)
{
InvocationObserver observer = obsv.getInvocationObserver(null, op, _emptyContext);
@@ -39,8 +39,7 @@ public final class ObserverHelper
static public InvocationObserver
get(Ice.ObjectPrx proxy, String op, java.util.Map<String, String> context)
{
- CommunicatorObserver obsv =
- ((Ice.ObjectPrxHelperBase)proxy).__reference().getInstance().initializationData().observer;
+ CommunicatorObserver obsv = ((Ice.ObjectPrxHelperBase)proxy).__reference().getInstance().getObserver();
if(obsv != null)
{
InvocationObserver observer;
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 82e7c1adfe8..3bf55802f61 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -217,7 +217,7 @@ public final class OutgoingConnectionFactory
// Try to establish the connection to the connectors.
//
DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
java.util.Iterator<ConnectorInfo> q = connectors.iterator();
ConnectorInfo ci = null;
while(q.hasNext())
@@ -1212,7 +1212,7 @@ public final class OutgoingConnectionFactory
assert(_iter.hasNext());
_current = _iter.next();
- Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.getObserver();
if(obsv != null)
{
_observer = obsv.getConnectionEstablishmentObserver(_current.endpoint,
diff --git a/java/src/IceInternal/RemoteObserverI.java b/java/src/IceInternal/RemoteObserverI.java
index 4ebc62e51d2..eb5b470a765 100644
--- a/java/src/IceInternal/RemoteObserverI.java
+++ b/java/src/IceInternal/RemoteObserverI.java
@@ -9,7 +9,8 @@
package IceInternal;
-public class RemoteObserverI extends IceMX.Observer<IceMX.RemoteMetrics>
+public class RemoteObserverI
+ extends IceMX.ObserverWithDelegate<IceMX.RemoteMetrics, Ice.Instrumentation.RemoteObserver>
implements Ice.Instrumentation.RemoteObserver
{
public void
@@ -23,5 +24,9 @@ public class RemoteObserverI extends IceMX.Observer<IceMX.RemoteMetrics>
v.replySize += size;
}
});
+ if(_delegate != null)
+ {
+ _delegate.reply(size);
+ }
}
} \ No newline at end of file
diff --git a/java/src/IceInternal/ThreadObserverI.java b/java/src/IceInternal/ThreadObserverI.java
index 7086618b4f7..f92784b12c1 100644
--- a/java/src/IceInternal/ThreadObserverI.java
+++ b/java/src/IceInternal/ThreadObserverI.java
@@ -9,7 +9,9 @@
package IceInternal;
-public class ThreadObserverI extends IceMX.Observer<IceMX.ThreadMetrics> implements Ice.Instrumentation.ThreadObserver
+public class ThreadObserverI
+ extends IceMX.ObserverWithDelegate<IceMX.ThreadMetrics, Ice.Instrumentation.ThreadObserver>
+ implements Ice.Instrumentation.ThreadObserver
{
public void
stateChanged(final Ice.Instrumentation.ThreadState oldState, final Ice.Instrumentation.ThreadState newState)
@@ -17,6 +19,10 @@ public class ThreadObserverI extends IceMX.Observer<IceMX.ThreadMetrics> impleme
_oldState = oldState;
_newState = newState;
forEach(_threadStateUpdate);
+ if(_delegate != null)
+ {
+ _delegate.stateChanged(oldState, newState);
+ }
}
private MetricsUpdate<IceMX.ThreadMetrics> _threadStateUpdate = new MetricsUpdate<IceMX.ThreadMetrics>()
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 19783697d1d..b7bc1b83c50 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -622,7 +622,7 @@ public final class ThreadPool
updateObserver()
{
// Must be called with the thread pool mutex locked
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
if(obsv != null)
{
_observer = obsv.getThreadObserver(_prefix, _name, _state, _observer);
diff --git a/java/src/IceMX/ObserverFactory.java b/java/src/IceMX/ObserverFactory.java
index 9a99e7a86e1..3881877c427 100644
--- a/java/src/IceMX/ObserverFactory.java
+++ b/java/src/IceMX/ObserverFactory.java
@@ -29,14 +29,6 @@ public class ObserverFactory<T extends Metrics, O extends Observer<T>>
});
}
- public
- ObserverFactory(String name, Class<T> cl)
- {
- _name = name;
- _metrics = null;
- _class = cl;
- }
-
public void
destroy()
{
@@ -56,7 +48,14 @@ public class ObserverFactory<T extends Metrics, O extends Observer<T>>
public synchronized O
getObserver(MetricsHelper<T> helper, Object observer, Class<O> cl)
{
- O old = (O)observer;
+ O old = null;
+ try
+ {
+ old = (O)observer;
+ }
+ catch(ClassCastException ex)
+ {
+ }
java.util.List<MetricsMap<T>.Entry> metricsObjects = null;
for(MetricsMap<T> m : _maps)
{
diff --git a/java/src/IceMX/ObserverFactoryWithDelegate.java b/java/src/IceMX/ObserverFactoryWithDelegate.java
new file mode 100644
index 00000000000..e900d09df40
--- /dev/null
+++ b/java/src/IceMX/ObserverFactoryWithDelegate.java
@@ -0,0 +1,48 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceMX;
+
+public class ObserverFactoryWithDelegate<T extends Metrics,
+ OImpl extends ObserverWithDelegate<T, O>,
+ O extends Ice.Instrumentation.Observer>
+ extends ObserverFactory<T, OImpl>
+{
+ public
+ ObserverFactoryWithDelegate(IceInternal.MetricsAdminI metrics, String name, Class<T> cl)
+ {
+ super(metrics, name, cl);
+ }
+
+ @SuppressWarnings("unchecked")
+ public O
+ getObserver(MetricsHelper<T> helper, Class<OImpl> cl, O delegate)
+ {
+ OImpl o = super.getObserver(helper, cl);
+ if(o != null)
+ {
+ o.setDelegate(delegate);
+ return (O)o;
+ }
+ return delegate;
+ }
+
+ @SuppressWarnings("unchecked")
+ public O
+ getObserver(MetricsHelper<T> helper, Object observer, Class<OImpl> cl, O delegate)
+ {
+ OImpl o = super.getObserver(helper, observer, cl);
+ if(o != null)
+ {
+ o.setDelegate(delegate);
+ return (O)o;
+ }
+ return delegate;
+ }
+};
diff --git a/java/src/IceMX/ObserverWithDelegate.java b/java/src/IceMX/ObserverWithDelegate.java
new file mode 100644
index 00000000000..a699533e282
--- /dev/null
+++ b/java/src/IceMX/ObserverWithDelegate.java
@@ -0,0 +1,71 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceMX;
+
+public class ObserverWithDelegate<T extends Metrics, O extends Ice.Instrumentation.Observer> extends Observer<T>
+{
+ public void
+ attach()
+ {
+ super.attach();
+ if(_delegate != null)
+ {
+ _delegate.attach();
+ }
+ }
+
+ public void
+ detach()
+ {
+ super.detach();
+ if(_delegate != null)
+ {
+ _delegate.detach();
+ }
+ }
+
+ public void
+ failed(String exceptionName)
+ {
+ super.failed(exceptionName);
+ if(_delegate != null)
+ {
+ _delegate.failed(exceptionName);
+ }
+ }
+
+ public O
+ getDelegate()
+ {
+ return _delegate;
+ }
+
+ public void
+ setDelegate(O del)
+ {
+ _delegate = del;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <S extends Metrics, ObserverImpl extends ObserverWithDelegate<S, Obs>,
+ Obs extends Ice.Instrumentation.Observer> Obs
+ getObserver(String mapName, MetricsHelper<S> helper, Class<S> mcl, Class<ObserverImpl> ocl, Obs delegate)
+ {
+ ObserverImpl obsv = super.getObserver(mapName, helper, mcl, ocl);
+ if(obsv != null)
+ {
+ obsv.setDelegate(delegate);
+ return (Obs)obsv;
+ }
+ return delegate;
+ }
+
+ protected O _delegate;
+};
diff --git a/java/src/IceMX/ObserverI.java b/java/src/IceMX/ObserverWithDelegateI.java
index 23baf30408d..ab0c72c48ee 100644
--- a/java/src/IceMX/ObserverI.java
+++ b/java/src/IceMX/ObserverWithDelegateI.java
@@ -9,6 +9,6 @@
package IceMX;
-public class ObserverI extends Observer<Metrics>
+public class ObserverWithDelegateI extends ObserverWithDelegate<Metrics, Ice.Instrumentation.Observer>
{
};
diff --git a/java/test/Ice/metrics/AllTests.java b/java/test/Ice/metrics/AllTests.java
index 6c36493f621..35c874eaf8f 100644
--- a/java/test/Ice/metrics/AllTests.java
+++ b/java/test/Ice/metrics/AllTests.java
@@ -397,7 +397,7 @@ public class AllTests
}
static MetricsPrx
- allTests(Ice.Communicator communicator, PrintWriter out)
+ allTests(Ice.Communicator communicator, PrintWriter out, CommunicatorObserverI obsv)
throws IceMX.UnknownMetricsView
{
MetricsPrx metrics = MetricsPrxHelper.checkedCast(communicator.stringToProxy("metrics:default -p 12010"));
@@ -1076,6 +1076,41 @@ public class AllTests
out.println("ok");
+ out.print("testing instrumentation observer delegate... ");
+ out.flush();
+
+ test(obsv.threadObserver.total > 0);
+ test(obsv.connectionObserver.total > 0);
+ test(obsv.connectionEstablishmentObserver.total > 0);
+ test(obsv.endpointLookupObserver.total > 0);
+ test(obsv.dispatchObserver.total > 0);
+ test(obsv.invocationObserver.total > 0);
+ test(obsv.invocationObserver.remoteObserver.total > 0);
+
+ test(obsv.threadObserver.current > 0);
+ test(obsv.connectionObserver.current > 0);
+ test(obsv.connectionEstablishmentObserver.current == 0);
+ test(obsv.endpointLookupObserver.current == 0);
+ test(obsv.dispatchObserver.current == 0);
+ test(obsv.invocationObserver.current == 0);
+ test(obsv.invocationObserver.remoteObserver.current == 0);
+
+ test(obsv.threadObserver.failedCount == 0);
+ test(obsv.connectionObserver.failedCount > 0);
+ test(obsv.connectionEstablishmentObserver.failedCount > 0);
+ test(obsv.endpointLookupObserver.failedCount > 0);
+ //test(obsv.dispatchObserver.failedCount > 0);
+ test(obsv.invocationObserver.failedCount > 0);
+ test(obsv.invocationObserver.remoteObserver.failedCount > 0);
+
+ test(obsv.threadObserver.states > 0);
+ test(obsv.connectionObserver.received > 0 && obsv.connectionObserver.sent > 0);
+ //test(obsv.dispatchObserver.userExceptionCount > 0);
+ test(obsv.invocationObserver.userExceptionCount > 0 && obsv.invocationObserver.retriedCount > 0);
+ test(obsv.invocationObserver.remoteObserver.replySize > 0);
+
+ out.println("ok");
+
return metrics;
}
}
diff --git a/java/test/Ice/metrics/Client.java b/java/test/Ice/metrics/Client.java
index 543819d1672..7ed4770bf06 100644
--- a/java/test/Ice/metrics/Client.java
+++ b/java/test/Ice/metrics/Client.java
@@ -18,7 +18,7 @@ public class Client extends test.Util.Application
Ice.Communicator communicator = communicator();
try
{
- MetricsPrx metrics = AllTests.allTests(communicator, getWriter());
+ MetricsPrx metrics = AllTests.allTests(communicator, getWriter(), _observer);
metrics.shutdown();
}
catch(Ice.UserException ex)
@@ -40,6 +40,7 @@ public class Client extends test.Util.Application
initData.properties.setProperty("Ice.Admin.DelayCreation", "1");
initData.properties.setProperty("Ice.Warn.Connections", "0");
initData.properties.setProperty("Ice.MessageSizeMax", "50000");
+ initData.observer = _observer;
return initData;
}
@@ -50,4 +51,6 @@ public class Client extends test.Util.Application
System.gc();
System.exit(result);
}
+
+ private CommunicatorObserverI _observer = new CommunicatorObserverI();
}
diff --git a/java/test/Ice/metrics/CommunicatorObserverI.java b/java/test/Ice/metrics/CommunicatorObserverI.java
new file mode 100644
index 00000000000..2973880f884
--- /dev/null
+++ b/java/test/Ice/metrics/CommunicatorObserverI.java
@@ -0,0 +1,140 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.metrics;
+
+class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorObserver
+{
+ private static void
+ test(boolean b)
+ {
+ if(!b)
+ {
+ throw new RuntimeException();
+ }
+ }
+
+ public void
+ setObserverUpdater(Ice.Instrumentation.ObserverUpdater u)
+ {
+ updater = u;
+ }
+
+ synchronized public Ice.Instrumentation.Observer
+ getConnectionEstablishmentObserver(Ice.Endpoint e, String s)
+ {
+ if(connectionEstablishmentObserver == null)
+ {
+ connectionEstablishmentObserver = new ObserverI();
+ connectionEstablishmentObserver.reset();
+ }
+ return connectionEstablishmentObserver;
+ }
+
+
+ synchronized public Ice.Instrumentation.Observer
+ getEndpointLookupObserver(Ice.Endpoint e)
+ {
+ if(endpointLookupObserver == null)
+ {
+ endpointLookupObserver = new ObserverI();
+ endpointLookupObserver.reset();
+ }
+ return endpointLookupObserver;
+ }
+
+ synchronized public Ice.Instrumentation.ConnectionObserver
+ getConnectionObserver(Ice.ConnectionInfo c,
+ Ice.Endpoint e,
+ Ice.Instrumentation.ConnectionState s,
+ Ice.Instrumentation.ConnectionObserver old)
+ {
+ test(old == null || old instanceof ConnectionObserverI);
+ if(connectionObserver == null)
+ {
+ connectionObserver = new ConnectionObserverI();
+ connectionObserver.reset();
+ }
+ return connectionObserver;
+ }
+
+ synchronized public Ice.Instrumentation.ThreadObserver
+ getThreadObserver(String p, String id, Ice.Instrumentation.ThreadState s,
+ Ice.Instrumentation.ThreadObserver old)
+ {
+ test(old == null || old instanceof ThreadObserverI);
+ if(threadObserver == null)
+ {
+ threadObserver = new ThreadObserverI();
+ threadObserver.reset();
+ }
+ return threadObserver;
+ }
+
+ synchronized public Ice.Instrumentation.InvocationObserver
+ getInvocationObserver(Ice.ObjectPrx p, String op, java.util.Map<String, String> ctx)
+ {
+ if(invocationObserver == null)
+ {
+ invocationObserver = new InvocationObserverI();
+ invocationObserver.reset();
+ }
+ return invocationObserver;
+ }
+
+ synchronized public Ice.Instrumentation.DispatchObserver
+ getDispatchObserver(Ice.Current current, int s)
+ {
+ if(dispatchObserver == null)
+ {
+ dispatchObserver = new DispatchObserverI();
+ dispatchObserver.reset();
+ }
+ return dispatchObserver;
+ }
+
+ synchronized void
+ reset()
+ {
+ if(connectionEstablishmentObserver != null)
+ {
+ connectionEstablishmentObserver.reset();
+ }
+ if(endpointLookupObserver != null)
+ {
+ endpointLookupObserver.reset();
+ }
+ if(connectionObserver != null)
+ {
+ connectionObserver.reset();
+ }
+ if(threadObserver != null)
+ {
+ threadObserver.reset();
+ }
+ if(invocationObserver != null)
+ {
+ invocationObserver.reset();
+ }
+ if(dispatchObserver != null)
+ {
+ dispatchObserver.reset();
+ }
+ }
+
+ Ice.Instrumentation.ObserverUpdater updater;
+
+ ObserverI connectionEstablishmentObserver;
+ ObserverI endpointLookupObserver;
+ ConnectionObserverI connectionObserver;
+ ThreadObserverI threadObserver;
+ InvocationObserverI invocationObserver;
+ DispatchObserverI dispatchObserver;
+};
+
diff --git a/java/test/Ice/metrics/ConnectionObserverI.java b/java/test/Ice/metrics/ConnectionObserverI.java
new file mode 100644
index 00000000000..721d527d748
--- /dev/null
+++ b/java/test/Ice/metrics/ConnectionObserverI.java
@@ -0,0 +1,37 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.metrics;
+
+class ConnectionObserverI extends ObserverI implements Ice.Instrumentation.ConnectionObserver
+{
+ public synchronized void
+ reset()
+ {
+ super.reset();
+ received = 0;
+ sent = 0;
+ }
+
+ public synchronized void
+ sentBytes(int s)
+ {
+ sent += s;
+ }
+
+ public synchronized void
+ receivedBytes(int s)
+ {
+ received += s;
+ }
+
+ int sent;
+ int received;
+};
+
diff --git a/java/test/Ice/metrics/DispatchObserverI.java b/java/test/Ice/metrics/DispatchObserverI.java
new file mode 100644
index 00000000000..1a29c236bd9
--- /dev/null
+++ b/java/test/Ice/metrics/DispatchObserverI.java
@@ -0,0 +1,37 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.metrics;
+
+class DispatchObserverI extends ObserverI implements Ice.Instrumentation.DispatchObserver
+{
+ public synchronized void
+ reset()
+ {
+ super.reset();
+ userExceptionCount = 0;
+ replySize = 0;
+ }
+
+ public synchronized void
+ userException()
+ {
+ ++userExceptionCount;
+ }
+
+ public synchronized void
+ reply(int s)
+ {
+ replySize += s;
+ }
+
+ int userExceptionCount;
+ int replySize;
+};
+
diff --git a/java/test/Ice/metrics/InvocationObserverI.java b/java/test/Ice/metrics/InvocationObserverI.java
new file mode 100644
index 00000000000..447ea094c5b
--- /dev/null
+++ b/java/test/Ice/metrics/InvocationObserverI.java
@@ -0,0 +1,53 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.metrics;
+
+class InvocationObserverI extends ObserverI implements Ice.Instrumentation.InvocationObserver
+{
+ public synchronized void
+ reset()
+ {
+ super.reset();
+ retriedCount = 0;
+ userExceptionCount = 0;
+ if(remoteObserver != null)
+ {
+ remoteObserver.reset();
+ }
+ }
+
+ public synchronized void
+ retried()
+ {
+ ++retriedCount;
+ }
+
+ public synchronized void
+ userException()
+ {
+ ++userExceptionCount;
+ }
+
+ public synchronized Ice.Instrumentation.RemoteObserver
+ getRemoteObserver(Ice.ConnectionInfo c, Ice.Endpoint e, int a, int b)
+ {
+ if(remoteObserver == null)
+ {
+ remoteObserver = new RemoteObserverI();
+ remoteObserver.reset();
+ }
+ return remoteObserver;
+ }
+
+ int userExceptionCount;
+ int retriedCount;
+
+ RemoteObserverI remoteObserver = null;
+};
diff --git a/java/test/Ice/metrics/ObserverI.java b/java/test/Ice/metrics/ObserverI.java
new file mode 100644
index 00000000000..46ec62a275b
--- /dev/null
+++ b/java/test/Ice/metrics/ObserverI.java
@@ -0,0 +1,42 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.metrics;
+
+class ObserverI implements Ice.Instrumentation.Observer
+{
+ synchronized public void
+ reset()
+ {
+ total = 0;
+ current = 0;
+ failedCount = 0;
+ }
+
+ synchronized public void
+ attach()
+ {
+ ++total;
+ ++current;
+ }
+ synchronized public void
+ detach()
+ {
+ --current;
+ }
+ synchronized public void
+ failed(String s)
+ {
+ ++failedCount;
+ }
+
+ int total;
+ int current;
+ int failedCount;
+};
diff --git a/java/test/Ice/metrics/RemoveObserverI.java b/java/test/Ice/metrics/RemoveObserverI.java
new file mode 100644
index 00000000000..82ecf2c9016
--- /dev/null
+++ b/java/test/Ice/metrics/RemoveObserverI.java
@@ -0,0 +1,29 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.metrics;
+
+class RemoteObserverI extends ObserverI implements Ice.Instrumentation.RemoteObserver
+{
+ public synchronized void
+ reset()
+ {
+ super.reset();
+ replySize = 0;
+ }
+
+ public synchronized void
+ reply(int s)
+ {
+ replySize += s;
+ }
+
+ int replySize;
+};
+
diff --git a/java/test/Ice/metrics/ThreadObserverI.java b/java/test/Ice/metrics/ThreadObserverI.java
new file mode 100644
index 00000000000..4221039ae22
--- /dev/null
+++ b/java/test/Ice/metrics/ThreadObserverI.java
@@ -0,0 +1,29 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.metrics;
+
+class ThreadObserverI extends ObserverI implements Ice.Instrumentation.ThreadObserver
+{
+ public synchronized void
+ reset()
+ {
+ super.reset();
+ states = 0;
+ }
+
+ public synchronized void
+ stateChanged(Ice.Instrumentation.ThreadState o, Ice.Instrumentation.ThreadState n)
+ {
+ ++states;
+ }
+
+ int states;
+};
+