summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-09-09 19:12:28 +0200
committerBenoit Foucher <benoit@zeroc.com>2013-09-09 19:12:28 +0200
commit8a0d1c7e34d8bd18bd85666cce94403a5158975c (patch)
tree36050a818282c0a92ee88a6ef28354e186c5aebe /cpp
parentTest scripts improvements (diff)
downloadice-8a0d1c7e34d8bd18bd85666cce94403a5158975c.tar.bz2
ice-8a0d1c7e34d8bd18bd85666cce94403a5158975c.tar.xz
ice-8a0d1c7e34d8bd18bd85666cce94403a5158975c.zip
Fixed ICE-5196: allow setting an observer with IceMX enabled
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/Ice/MetricsObserverI.h32
-rw-r--r--cpp/src/Ice/CommunicatorI.cpp2
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp4
-rw-r--r--cpp/src/Ice/ConnectionI.cpp20
-rw-r--r--cpp/src/Ice/EndpointI.cpp6
-rw-r--r--cpp/src/Ice/Incoming.cpp2
-rw-r--r--cpp/src/Ice/Instance.cpp28
-rw-r--r--cpp/src/Ice/Instance.h6
-rw-r--r--cpp/src/Ice/InstrumentationI.cpp120
-rw-r--r--cpp/src/Ice/InstrumentationI.h146
-rw-r--r--cpp/src/Ice/ObserverHelper.cpp8
-rw-r--r--cpp/src/Ice/ThreadPool.cpp2
-rw-r--r--cpp/test/Ice/metrics/AllTests.cpp37
-rw-r--r--cpp/test/Ice/metrics/Client.cpp11
-rw-r--r--cpp/test/Ice/metrics/InstrumentationI.h334
15 files changed, 680 insertions, 78 deletions
diff --git a/cpp/include/Ice/MetricsObserverI.h b/cpp/include/Ice/MetricsObserverI.h
index 797e9fcbf83..1ea304df7c3 100644
--- a/cpp/include/Ice/MetricsObserverI.h
+++ b/cpp/include/Ice/MetricsObserverI.h
@@ -430,10 +430,6 @@ public:
_metrics->registerMap<MetricsType>(name, this);
}
- ObserverFactoryT(const std::string& name) : _name(name)
- {
- }
-
~ObserverFactoryT()
{
if(_metrics)
@@ -446,6 +442,10 @@ public:
getObserver(const MetricsHelperT<MetricsType>& helper)
{
IceUtil::Mutex::Lock sync(*this);
+ if(!_metrics)
+ {
+ return 0;
+ }
typename ObserverImplType::EntrySeqType metricsObjects;
for(typename MetricsMapSeqType::const_iterator p = _maps.begin(); p != _maps.end(); ++p)
@@ -470,13 +470,18 @@ public:
template<typename ObserverPtrType> ObserverImplPtrType
getObserver(const MetricsHelperT<MetricsType>& helper, const ObserverPtrType& observer)
{
- if(!observer)
+ ObserverImplPtrType old = ObserverImplPtrType::dynamicCast(observer);
+ if(!observer || !old)
{
return getObserver(helper);
}
IceUtil::Mutex::Lock sync(*this);
- ObserverImplPtrType old = ObserverImplPtrType::dynamicCast(observer);
+ if(!_metrics)
+ {
+ return 0;
+ }
+
typename ObserverImplType::EntrySeqType metricsObjects;
for(typename MetricsMapSeqType::const_iterator p = _maps.begin(); p != _maps.end(); ++p)
{
@@ -501,6 +506,7 @@ public:
template<typename SubMapMetricsType> void
registerSubMap(const std::string& subMap, MetricsMap MetricsType::* member)
{
+ assert(_metrics);
_metrics->registerSubMap<SubMapMetricsType>(_name, subMap, member);
}
@@ -514,6 +520,11 @@ public:
UpdaterPtr updater;
{
IceUtil::Mutex::Lock sync(*this);
+ if(!_metrics)
+ {
+ return;
+ }
+
std::vector<IceInternal::MetricsMapIPtr> maps = _metrics->getMaps(_name);
_maps.clear();
for(std::vector<IceInternal::MetricsMapIPtr>::const_iterator p = maps.begin(); p != maps.end(); ++p)
@@ -537,9 +548,16 @@ public:
_updater = updater;
}
+ void destroy()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ _metrics = 0;
+ _maps.clear();
+ }
+
private:
- const IceInternal::MetricsAdminIPtr _metrics;
+ IceInternal::MetricsAdminIPtr _metrics;
const std::string _name;
MetricsMapSeqType _maps;
volatile bool _enabled;
diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp
index f3e17c4dd2d..4646479d978 100644
--- a/cpp/src/Ice/CommunicatorI.cpp
+++ b/cpp/src/Ice/CommunicatorI.cpp
@@ -261,7 +261,7 @@ Ice::CommunicatorI::getStats() const
Ice::Instrumentation::CommunicatorObserverPtr
Ice::CommunicatorI::getObserver() const
{
- return _instance->initializationData().observer;
+ return _instance->getObserver();
}
RouterPrx
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index 60534489cd9..822e26b84ec 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -225,7 +225,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
// Try to establish the connection to the connectors.
//
DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
- const CommunicatorObserverPtr& obsv = _instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _instance->getObserver();
vector<ConnectorInfo>::const_iterator q;
for(q = connectors.begin(); q != connectors.end(); ++q)
{
@@ -1122,7 +1122,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector()
try
{
- const CommunicatorObserverPtr& obsv = _factory->_instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _factory->_instance->getObserver();
if(obsv)
{
_observer = obsv->getConnectionEstablishmentObserver(_iter->endpoint, _iter->connector->toString());
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 8091d2f2233..fbf9b2ea5f6 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -525,11 +525,11 @@ Ice::ConnectionI::updateObserver()
return;
}
- assert(_instance->initializationData().observer);
- _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(),
- _endpoint,
- toConnectionState(_state),
- _observer.get()));
+ assert(_instance->getObserver());
+ _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ toConnectionState(_state),
+ _observer.get()));
}
void
@@ -2188,16 +2188,16 @@ Ice::ConnectionI::setState(State state)
}
}
- if(_instance->initializationData().observer)
+ if(_instance->getObserver())
{
ConnectionState oldState = toConnectionState(_state);
ConnectionState newState = toConnectionState(state);
if(oldState != newState)
{
- _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(),
- _endpoint,
- newState,
- _observer.get()));
+ _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(),
+ _endpoint,
+ newState,
+ _observer.get()));
}
if(_observer && state == StateClosed && _exception.get())
{
diff --git a/cpp/src/Ice/EndpointI.cpp b/cpp/src/Ice/EndpointI.cpp
index a27c4082faf..7d70754021b 100644
--- a/cpp/src/Ice/EndpointI.cpp
+++ b/cpp/src/Ice/EndpointI.cpp
@@ -139,7 +139,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En
}
ObserverHelperT<> observer;
- const CommunicatorObserverPtr& obsv = _instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _instance->getObserver();
if(obsv)
{
observer.attach(obsv->getEndpointLookupObserver(endpoint));
@@ -201,7 +201,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En
entry.endpoint = endpoint;
entry.callback = callback;
- const CommunicatorObserverPtr& obsv = _instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _instance->getObserver();
if(obsv)
{
entry.observer = obsv->getEndpointLookupObserver(endpoint);
@@ -310,7 +310,7 @@ void
IceInternal::EndpointHostResolver::updateObserver()
{
Lock sync(*this);
- const CommunicatorObserverPtr& obsv = _instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _instance->getObserver();
if(obsv)
{
_observer.attach(obsv->getThreadObserver("Communicator", name(), ThreadStateIdle, _observer.get()));
diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp
index 928c3c50088..e516940c84f 100644
--- a/cpp/src/Ice/Incoming.cpp
+++ b/cpp/src/Ice/Incoming.cpp
@@ -592,7 +592,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
_current.ctx.insert(_current.ctx.end(), pr);
}
- const CommunicatorObserverPtr& obsv = _is->instance()->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _is->instance()->getObserver();
if(obsv)
{
// Read the parameter encapsulation size.
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index f5a61cb9995..19fa9701352 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -1148,18 +1148,20 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi
// Setup the communicator observer only if the user didn't already set an
// Ice observer resolver and if the admininistrative endpoints are set.
//
- if(!_initData.observer &&
- (_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end()) &&
+ if((_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end()) &&
_initData.properties->getProperty("Ice.Admin.Endpoints") != "")
{
- CommunicatorObserverIPtr observer = new CommunicatorObserverI(_metricsAdmin);
- _initData.observer = observer;
+ _observer = new CommunicatorObserverI(_metricsAdmin, _initData.observer);
//
// Make sure the admin plugin receives property updates.
//
props->addUpdateCallback(_metricsAdmin);
}
+ else
+ {
+ _observer = _initData.observer;
+ }
__setNoDelete(false);
}
@@ -1229,10 +1231,10 @@ IceInternal::Instance::finishSetup(int& argc, char* argv[])
//
// Set observer updater
//
- if(_initData.observer)
+ if(_observer)
{
- theCollector->updateObserver(_initData.observer);
- _initData.observer->setObserverUpdater(new ObserverUpdaterI(this));
+ theCollector->updateObserver(_observer);
+ _observer->setObserverUpdater(new ObserverUpdaterI(this));
}
//
@@ -1398,18 +1400,20 @@ IceInternal::Instance::destroy()
_retryQueue->destroy();
}
- if(_initData.observer && theCollector)
+ if(_observer && theCollector)
{
- theCollector->clearObserver(_initData.observer);
+ theCollector->clearObserver(_observer);
}
if(_metricsAdmin)
{
_metricsAdmin->destroy();
_metricsAdmin = 0;
- if(CommunicatorObserverIPtr::dynamicCast(_initData.observer))
+
+ // Break cyclic reference counts. Don't clear _observer, it's immutable.
+ if(_observer)
{
- _initData.observer = 0; // Clear cyclic reference counts.
+ CommunicatorObserverIPtr::dynamicCast(_observer)->destroy();
}
}
@@ -1567,7 +1571,7 @@ IceInternal::Instance::updateThreadObservers()
_endpointHostResolver->updateObserver();
}
assert(theCollector);
- theCollector->updateObserver(_initData.observer);
+ theCollector->updateObserver(_observer);
}
catch(const Ice::CommunicatorDestroyedException&)
{
diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h
index 16aa25648b0..34261196016 100644
--- a/cpp/src/Ice/Instance.h
+++ b/cpp/src/Ice/Instance.h
@@ -93,6 +93,11 @@ public:
void addAdminFacet(const Ice::ObjectPtr&, const std::string&);
Ice::ObjectPtr removeAdminFacet(const std::string&);
Ice::ObjectPtr findAdminFacet(const std::string&);
+
+ const Ice::Instrumentation::CommunicatorObserverPtr& getObserver() const
+ {
+ return _observer;
+ }
const Ice::ImplicitContextIPtr& getImplicitContext() const
{
@@ -157,6 +162,7 @@ private:
Ice::Identity _adminIdentity;
std::set<std::string> _adminFacetFilter;
IceInternal::MetricsAdminIPtr _metricsAdmin;
+ Ice::Instrumentation::CommunicatorObserverPtr _observer;
};
class ProcessI : public Ice::Process
diff --git a/cpp/src/Ice/InstrumentationI.cpp b/cpp/src/Ice/InstrumentationI.cpp
index 82598bbf1dc..fcfae4200a7 100644
--- a/cpp/src/Ice/InstrumentationI.cpp
+++ b/cpp/src/Ice/InstrumentationI.cpp
@@ -694,48 +694,81 @@ void
ConnectionObserverI::sentBytes(Int num)
{
forEach(add(&ConnectionMetrics::sentBytes, num));
+ if(_delegate)
+ {
+ _delegate->sentBytes(num);
+ }
}
void
ConnectionObserverI::receivedBytes(Int num)
{
forEach(add(&ConnectionMetrics::receivedBytes, num));
+ if(_delegate)
+ {
+ _delegate->receivedBytes(num);
+ }
}
void
ThreadObserverI::stateChanged(ThreadState oldState, ThreadState newState)
{
forEach(ThreadStateChanged(oldState, newState));
+ if(_delegate)
+ {
+ _delegate->stateChanged(oldState, newState);
+ }
+
}
void
DispatchObserverI::userException()
{
forEach(inc(&DispatchMetrics::userException));
+ if(_delegate)
+ {
+ _delegate->userException();
+ }
}
void
DispatchObserverI::reply(Int size)
{
forEach(add(&DispatchMetrics::replySize, size));
+ if(_delegate)
+ {
+ _delegate->reply(size);
+ }
}
void
RemoteObserverI::reply(Int size)
{
forEach(add(&RemoteMetrics::replySize, size));
+ if(_delegate)
+ {
+ _delegate->reply(size);
+ }
}
void
InvocationObserverI::retried()
{
forEach(inc(&InvocationMetrics::retry));
+ if(_delegate)
+ {
+ _delegate->retried();
+ }
}
void
InvocationObserverI::userException()
{
forEach(inc(&InvocationMetrics::userException));
+ if(_delegate)
+ {
+ _delegate->userException();
+ }
}
RemoteObserverPtr
@@ -746,7 +779,14 @@ InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection,
{
try
{
- return getObserver<RemoteObserverI>("Remote", RemoteInvocationHelper(connection, endpoint, requestId, size));
+ RemoteObserverPtr delegate;
+ if(_delegate)
+ {
+ delegate = _delegate->getRemoteObserver(connection, endpoint, requestId, size);
+ }
+ return getObserver<RemoteObserverI>("Remote",
+ RemoteInvocationHelper(connection, endpoint, requestId, size),
+ delegate);
}
catch(const exception&)
{
@@ -754,8 +794,11 @@ InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection,
return 0;
}
-CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr& metrics) :
+CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr& metrics,
+ const Ice::Instrumentation::CommunicatorObserverPtr& delegate) :
_metrics(metrics),
+ _logger(metrics->getLogger()),
+ _delegate(delegate),
_connections(metrics, "Connection"),
_dispatch(metrics, "Dispatch"),
_invocations(metrics, "Invocation"),
@@ -771,6 +814,10 @@ CommunicatorObserverI::setObserverUpdater(const ObserverUpdaterPtr& updater)
{
_connections.setUpdater(newUpdater(updater, &ObserverUpdater::updateConnectionObservers));
_threads.setUpdater(newUpdater(updater, &ObserverUpdater::updateThreadObservers));
+ if(_delegate)
+ {
+ _delegate->setObserverUpdater(updater);
+ }
}
ObserverPtr
@@ -780,11 +827,16 @@ CommunicatorObserverI::getConnectionEstablishmentObserver(const EndpointPtr& end
{
try
{
- return _connects.getObserver(EndpointHelper(endpt, connector));
+ ObserverPtr delegate;
+ if(_delegate)
+ {
+ delegate = _delegate->getConnectionEstablishmentObserver(endpt, connector);
+ }
+ return _connects.getObserver(EndpointHelper(endpt, connector), delegate);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -798,11 +850,16 @@ CommunicatorObserverI::getEndpointLookupObserver(const EndpointPtr& endpt)
{
try
{
- return _endpointLookups.getObserver(EndpointHelper(endpt));
+ ObserverPtr delegate;
+ if(_delegate)
+ {
+ delegate = _delegate->getEndpointLookupObserver(endpt);
+ }
+ return _endpointLookups.getObserver(EndpointHelper(endpt), delegate);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -819,11 +876,17 @@ CommunicatorObserverI::getConnectionObserver(const ConnectionInfoPtr& con,
{
try
{
- return _connections.getObserver(ConnectionHelper(con, endpt, state), observer);
+ ConnectionObserverPtr delegate;
+ ConnectionObserverI* o = dynamic_cast<ConnectionObserverI*>(observer.get());
+ if(_delegate)
+ {
+ delegate = _delegate->getConnectionObserver(con, endpt, state, o ? o->getDelegate() : observer);
+ }
+ return _connections.getObserver(ConnectionHelper(con, endpt, state), delegate, observer);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -840,11 +903,17 @@ CommunicatorObserverI::getThreadObserver(const string& parent,
{
try
{
- return _threads.getObserver(ThreadHelper(parent, id, state), observer);
+ ThreadObserverPtr delegate;
+ ThreadObserverI* o = dynamic_cast<ThreadObserverI*>(observer.get());
+ if(_delegate)
+ {
+ delegate = _delegate->getThreadObserver(parent, id, state, o ? o->getDelegate() : observer);
+ }
+ return _threads.getObserver(ThreadHelper(parent, id, state), delegate, observer);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -858,11 +927,16 @@ CommunicatorObserverI::getInvocationObserver(const ObjectPrx& proxy, const strin
{
try
{
- return _invocations.getObserver(InvocationHelper(proxy, op, ctx));
+ InvocationObserverPtr delegate;
+ if(_delegate)
+ {
+ delegate = _delegate->getInvocationObserver(proxy, op, ctx);
+ }
+ return _invocations.getObserver(InvocationHelper(proxy, op, ctx), delegate);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -876,11 +950,16 @@ CommunicatorObserverI::getDispatchObserver(const Current& current, int size)
{
try
{
- return _dispatch.getObserver(DispatchHelper(current, size));
+ DispatchObserverPtr delegate;
+ if(_delegate)
+ {
+ delegate = _delegate->getDispatchObserver(current, size);
+ }
+ return _dispatch.getObserver(DispatchHelper(current, size), delegate);
}
catch(const exception& ex)
{
- Error error(_metrics->getLogger());
+ Error error(_logger);
error << "unexpected exception trying to obtain observer:\n" << ex;
}
}
@@ -890,5 +969,18 @@ CommunicatorObserverI::getDispatchObserver(const Current& current, int size)
const IceInternal::MetricsAdminIPtr&
CommunicatorObserverI::getMetricsAdmin() const
{
+ assert(_metrics);
return _metrics;
}
+
+void
+CommunicatorObserverI::destroy()
+{
+ _metrics = 0;
+ _connections.destroy();
+ _dispatch.destroy();
+ _invocations.destroy();
+ _threads.destroy();
+ _connects.destroy();
+ _endpointLookups.destroy();
+}
diff --git a/cpp/src/Ice/InstrumentationI.h b/cpp/src/Ice/InstrumentationI.h
index 9f6b61327b9..1db093feae2 100644
--- a/cpp/src/Ice/InstrumentationI.h
+++ b/cpp/src/Ice/InstrumentationI.h
@@ -16,6 +16,109 @@
namespace IceInternal
{
+template<typename T, typename O> class ObserverWithDelegateT : public IceMX::ObserverT<T>, virtual public O
+{
+public:
+
+ typedef O ObserverType;
+ typedef typename IceInternal::Handle<O> ObserverPtrType;
+
+ virtual void
+ attach()
+ {
+ IceMX::ObserverT<T>::attach();
+ if(_delegate)
+ {
+ _delegate->attach();
+ }
+ }
+
+ virtual void
+ detach()
+ {
+ IceMX::ObserverT<T>::detach();
+ if(_delegate)
+ {
+ _delegate->detach();
+ }
+ }
+
+ virtual void
+ failed(const std::string& exceptionName)
+ {
+ IceMX::ObserverT<T>::failed(exceptionName);
+ if(_delegate)
+ {
+ _delegate->failed(exceptionName);
+ }
+ }
+
+ ObserverPtrType
+ getDelegate() const
+ {
+ return _delegate;
+ }
+
+ void
+ setDelegate(ObserverPtrType delegate)
+ {
+ _delegate = delegate;
+ }
+
+ template<typename ObserverImpl, typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType
+ getObserver(const std::string& mapName, const IceMX::MetricsHelperT<ObserverMetricsType>& helper,
+ const ObserverPtrType& del)
+ {
+ IceInternal::Handle<ObserverImpl> obsv = IceMX::ObserverT<T>::template getObserver<ObserverImpl>(mapName,
+ helper);
+ if(obsv)
+ {
+ obsv->setDelegate(del);
+ return obsv;
+ }
+ return del;
+ }
+
+protected:
+
+ ObserverPtrType _delegate;
+};
+
+template<typename T> class ObserverFactoryWithDelegateT : public IceMX::ObserverFactoryT<T>
+{
+public:
+
+ ObserverFactoryWithDelegateT(const IceInternal::MetricsAdminIPtr& metrics, const std::string& name) :
+ IceMX::ObserverFactoryT<T>(metrics, name)
+ {
+ }
+
+ template<typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType
+ getObserver(const IceMX::MetricsHelperT<ObserverMetricsType>& helper, const ObserverPtrType& del)
+ {
+ IceInternal::Handle<T> obsv = IceMX::ObserverFactoryT<T>::getObserver(helper);
+ if(obsv)
+ {
+ obsv->setDelegate(del);
+ return obsv;
+ }
+ return del;
+ }
+
+ template<typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType
+ getObserver(const IceMX::MetricsHelperT<ObserverMetricsType>& helper, const ObserverPtrType& del,
+ const ObserverPtrType& old)
+ {
+ IceInternal::Handle<T> obsv = IceMX::ObserverFactoryT<T>::getObserver(helper, old);
+ if(obsv)
+ {
+ obsv->setDelegate(del);
+ return obsv;
+ }
+ return del;
+ }
+};
+
template<typename Helper>
void addEndpointAttributes(typename Helper::Attributes& attrs)
{
@@ -49,8 +152,8 @@ void addConnectionAttributes(typename Helper::Attributes& attrs)
addEndpointAttributes<Helper>(attrs);
}
-class ConnectionObserverI : public Ice::Instrumentation::ConnectionObserver,
- public IceMX::ObserverT<IceMX::ConnectionMetrics>
+class ConnectionObserverI : public ObserverWithDelegateT<IceMX::ConnectionMetrics,
+ Ice::Instrumentation::ConnectionObserver>
{
public:
@@ -58,16 +161,14 @@ public:
virtual void receivedBytes(Ice::Int);
};
-class ThreadObserverI : public Ice::Instrumentation::ThreadObserver,
- public IceMX::ObserverT<IceMX::ThreadMetrics>
+class ThreadObserverI : public ObserverWithDelegateT<IceMX::ThreadMetrics, Ice::Instrumentation::ThreadObserver>
{
public:
virtual void stateChanged(Ice::Instrumentation::ThreadState, Ice::Instrumentation::ThreadState);
};
-class DispatchObserverI : public Ice::Instrumentation::DispatchObserver,
- public IceMX::ObserverT<IceMX::DispatchMetrics>
+class DispatchObserverI : public ObserverWithDelegateT<IceMX::DispatchMetrics, Ice::Instrumentation::DispatchObserver>
{
public:
@@ -76,14 +177,15 @@ public:
virtual void reply(Ice::Int);
};
-class RemoteObserverI : public Ice::Instrumentation::RemoteObserver,
- public IceMX::ObserverT<IceMX::RemoteMetrics>
+class RemoteObserverI : public ObserverWithDelegateT<IceMX::RemoteMetrics, Ice::Instrumentation::RemoteObserver>
{
+public:
+
virtual void reply(Ice::Int);
};
-class InvocationObserverI : public Ice::Instrumentation::InvocationObserver,
- public IceMX::ObserverT<IceMX::InvocationMetrics>
+class InvocationObserverI : public ObserverWithDelegateT<IceMX::InvocationMetrics,
+ Ice::Instrumentation::InvocationObserver>
{
public:
@@ -95,11 +197,15 @@ public:
const Ice::EndpointPtr&, Ice::Int, Ice::Int);
};
+typedef ObserverWithDelegateT<IceMX::Metrics, Ice::Instrumentation::Observer> ObserverI;
+
class ICE_API CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver
{
public:
- CommunicatorObserverI(const IceInternal::MetricsAdminIPtr&);
+ CommunicatorObserverI(const IceInternal::MetricsAdminIPtr&,
+ const Ice::Instrumentation::CommunicatorObserverPtr& =
+ Ice::Instrumentation::CommunicatorObserverPtr());
virtual void setObserverUpdater(const Ice::Instrumentation::ObserverUpdaterPtr&);
@@ -126,16 +232,20 @@ public:
const IceInternal::MetricsAdminIPtr& getMetricsAdmin() const;
+ void destroy();
+
private:
- const IceInternal::MetricsAdminIPtr _metrics;
+ IceInternal::MetricsAdminIPtr _metrics;
+ Ice::LoggerPtr _logger;
+ const Ice::Instrumentation::CommunicatorObserverPtr _delegate;
- IceMX::ObserverFactoryT<ConnectionObserverI> _connections;
- IceMX::ObserverFactoryT<DispatchObserverI> _dispatch;
- IceMX::ObserverFactoryT<InvocationObserverI> _invocations;
- IceMX::ObserverFactoryT<ThreadObserverI> _threads;
- IceMX::ObserverFactoryT<IceMX::ObserverI> _connects;
- IceMX::ObserverFactoryT<IceMX::ObserverI> _endpointLookups;
+ ObserverFactoryWithDelegateT<ConnectionObserverI> _connections;
+ ObserverFactoryWithDelegateT<DispatchObserverI> _dispatch;
+ ObserverFactoryWithDelegateT<InvocationObserverI> _invocations;
+ ObserverFactoryWithDelegateT<ThreadObserverI> _threads;
+ ObserverFactoryWithDelegateT<ObserverI> _connects;
+ ObserverFactoryWithDelegateT<ObserverI> _endpointLookups;
};
typedef IceUtil::Handle<CommunicatorObserverI> CommunicatorObserverIPtr;
diff --git a/cpp/src/Ice/ObserverHelper.cpp b/cpp/src/Ice/ObserverHelper.cpp
index ca148bff6da..4215d77eabd 100644
--- a/cpp/src/Ice/ObserverHelper.cpp
+++ b/cpp/src/Ice/ObserverHelper.cpp
@@ -25,7 +25,7 @@ Ice::Context emptyCtx;
IceInternal::InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx)
{
- const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->getObserver();
if(!obsv)
{
return;
@@ -43,7 +43,7 @@ IceInternal::InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy
IceInternal::InvocationObserver::InvocationObserver(IceInternal::Instance* instance, const string& op)
{
- const CommunicatorObserverPtr& obsv = instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = instance->getObserver();
if(!obsv)
{
return;
@@ -55,7 +55,7 @@ IceInternal::InvocationObserver::InvocationObserver(IceInternal::Instance* insta
void
IceInternal::InvocationObserver::attach(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx)
{
- const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->getObserver();
if(!obsv)
{
return;
@@ -74,7 +74,7 @@ IceInternal::InvocationObserver::attach(IceProxy::Ice::Object* proxy, const stri
void
IceInternal::InvocationObserver::attach(IceInternal::Instance* instance, const string& op)
{
- const CommunicatorObserverPtr& obsv = instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = instance->getObserver();
if(!obsv)
{
return;
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 62652c786d2..2234e8c7ceb 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -1128,7 +1128,7 @@ void
IceInternal::ThreadPool::EventHandlerThread::updateObserver()
{
// Must be called with the thread pool mutex locked
- const CommunicatorObserverPtr& obsv = _pool->_instance->initializationData().observer;
+ const CommunicatorObserverPtr& obsv = _pool->_instance->getObserver();
if(obsv)
{
_observer.attach(obsv->getThreadObserver(_pool->_prefix, name(), _state, _observer.get()));
diff --git a/cpp/test/Ice/metrics/AllTests.cpp b/cpp/test/Ice/metrics/AllTests.cpp
index a181f43fbb1..ea097a349ac 100644
--- a/cpp/test/Ice/metrics/AllTests.cpp
+++ b/cpp/test/Ice/metrics/AllTests.cpp
@@ -9,6 +9,7 @@
#include <Ice/Ice.h>
#include <TestCommon.h>
+#include <InstrumentationI.h>
#include <Test.h>
using namespace std;
@@ -368,7 +369,7 @@ toMap(const IceMX::MetricsMap& mmap)
}
MetricsPrx
-allTests(const Ice::CommunicatorPtr& communicator)
+allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPtr& obsv)
{
MetricsPrx metrics = MetricsPrx::checkedCast(communicator->stringToProxy("metrics:default -p 12010"));
@@ -1047,5 +1048,39 @@ allTests(const Ice::CommunicatorPtr& communicator)
cout << "ok" << endl;
+ cout << "testing instrumentation observer delegate... " << flush;
+
+ test(obsv->threadObserver->total > 0);
+ test(obsv->connectionObserver->total > 0);
+ test(obsv->connectionEstablishmentObserver->total > 0);
+ test(obsv->endpointLookupObserver->total > 0);
+ test(obsv->dispatchObserver->total > 0);
+ test(obsv->invocationObserver->total > 0);
+ test(obsv->invocationObserver->remoteObserver->total > 0);
+
+ test(obsv->threadObserver->current > 0);
+ test(obsv->connectionObserver->current > 0);
+ test(obsv->connectionEstablishmentObserver->current == 0);
+ test(obsv->endpointLookupObserver->current == 0);
+ test(obsv->dispatchObserver->current == 0);
+ test(obsv->invocationObserver->current == 0);
+ test(obsv->invocationObserver->remoteObserver->current == 0);
+
+ test(obsv->threadObserver->failedCount == 0);
+ test(obsv->connectionObserver->failedCount > 0);
+ test(obsv->connectionEstablishmentObserver->failedCount > 0);
+ test(obsv->endpointLookupObserver->failedCount > 0);
+ //test(obsv->dispatchObserver->failedCount > 0);
+ test(obsv->invocationObserver->failedCount > 0);
+ test(obsv->invocationObserver->remoteObserver->failedCount > 0);
+
+ test(obsv->threadObserver->states > 0);
+ test(obsv->connectionObserver->received > 0 && obsv->connectionObserver->sent > 0);
+ //test(obsv->dispatchObserver->userExceptionCount > 0);
+ test(obsv->invocationObserver->userExceptionCount > 0 && obsv->invocationObserver->retriedCount > 0);
+ test(obsv->invocationObserver->remoteObserver->replySize > 0);
+
+ cout << "ok" << endl;
+
return metrics;
}
diff --git a/cpp/test/Ice/metrics/Client.cpp b/cpp/test/Ice/metrics/Client.cpp
index f30ad6032ad..4f541c2f7cc 100644
--- a/cpp/test/Ice/metrics/Client.cpp
+++ b/cpp/test/Ice/metrics/Client.cpp
@@ -10,6 +10,7 @@
#include <Ice/Ice.h>
#include <TestCommon.h>
#include <Test.h>
+#include <InstrumentationI.h>
DEFINE_TEST("client")
@@ -17,10 +18,10 @@ using namespace std;
using namespace Test;
int
-run(int, char**, const Ice::CommunicatorPtr& communicator)
+run(int, char**, const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPtr& observer)
{
- MetricsPrx allTests(const Ice::CommunicatorPtr&);
- MetricsPrx metrics = allTests(communicator);
+ MetricsPrx allTests(const Ice::CommunicatorPtr&, const CommunicatorObserverIPtr&);
+ MetricsPrx metrics = allTests(communicator, observer);
metrics->shutdown();
return EXIT_SUCCESS;
}
@@ -40,8 +41,10 @@ main(int argc, char* argv[])
initData.properties->setProperty("Ice.Admin.DelayCreation", "1");
initData.properties->setProperty("Ice.Warn.Connections", "0");
initData.properties->setProperty("Ice.MessageSizeMax", "50000");
+ CommunicatorObserverIPtr observer = new CommunicatorObserverI();
+ initData.observer = observer;
communicator = Ice::initialize(argc, argv, initData);
- status = run(argc, argv, communicator);
+ status = run(argc, argv, communicator, observer);
}
catch(const Ice::Exception& ex)
{
diff --git a/cpp/test/Ice/metrics/InstrumentationI.h b/cpp/test/Ice/metrics/InstrumentationI.h
new file mode 100644
index 00000000000..92ff7e1d11d
--- /dev/null
+++ b/cpp/test/Ice/metrics/InstrumentationI.h
@@ -0,0 +1,334 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#ifndef INSTRUMENTATION_I_H
+#define INSTRUMENTATION_I_H
+
+#include <Ice/Instrumentation.h>
+
+class ObserverI : virtual public Ice::Instrumentation::Observer, public IceUtil::Mutex
+{
+public:
+
+ virtual void
+ reset()
+ {
+ total = 0;
+ current = 0;
+ failedCount = 0;
+ }
+
+ virtual void
+ attach()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++total;
+ ++current;
+ }
+ virtual void
+ detach()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ --current;
+ }
+ virtual void
+ failed(const std::string&)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++failedCount;
+ }
+
+ Ice::Int total;
+ Ice::Int current;
+ Ice::Int failedCount;
+};
+
+class ConnectionObserverI : public Ice::Instrumentation::ConnectionObserver, public ObserverI
+{
+public:
+
+ virtual void
+ reset()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ObserverI::reset();
+ received = 0;
+ sent = 0;
+ }
+
+ virtual void
+ sentBytes(Ice::Int s)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ sent += s;
+ }
+
+ virtual void
+ receivedBytes(Ice::Int s)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ received += s;
+ }
+
+ Ice::Int sent;
+ Ice::Int received;
+};
+
+class ThreadObserverI : public Ice::Instrumentation::ThreadObserver, public ObserverI
+{
+public:
+
+ virtual void
+ reset()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ObserverI::reset();
+ states = 0;
+ }
+
+ virtual void
+ stateChanged(Ice::Instrumentation::ThreadState, Ice::Instrumentation::ThreadState)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++states;
+ }
+
+ Ice::Int states;
+};
+
+class DispatchObserverI : public Ice::Instrumentation::DispatchObserver, public ObserverI
+{
+public:
+
+ virtual void reset()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ObserverI::reset();
+ userExceptionCount = 0;
+ replySize = 0;
+ }
+
+ virtual void
+ userException()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++userExceptionCount;
+ }
+
+ virtual void
+ reply(Ice::Int s)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ replySize += s;
+ }
+
+ Ice::Int userExceptionCount;
+ Ice::Int replySize;
+};
+
+class RemoteObserverI : public Ice::Instrumentation::RemoteObserver, public ObserverI
+{
+public:
+
+ virtual void
+ reset()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ObserverI::reset();
+ replySize = 0;
+ }
+
+ virtual void
+ reply(Ice::Int s)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ replySize += s;
+ }
+
+ Ice::Int replySize;
+};
+
+class InvocationObserverI : public Ice::Instrumentation::InvocationObserver, public ObserverI
+{
+public:
+
+ virtual void reset()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ObserverI::reset();
+ retriedCount = 0;
+ userExceptionCount = 0;
+ if(remoteObserver)
+ {
+ remoteObserver->reset();
+ }
+ }
+
+ virtual void
+ retried()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++retriedCount;
+ }
+
+ virtual void
+ userException()
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ ++userExceptionCount;
+ }
+
+ virtual Ice::Instrumentation::RemoteObserverPtr
+ getRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& e, Ice::Int, Ice::Int)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ if(!remoteObserver)
+ {
+ remoteObserver = new RemoteObserverI();
+ remoteObserver->reset();
+ }
+ return remoteObserver;
+ }
+
+ Ice::Int userExceptionCount;
+ Ice::Int retriedCount;
+
+ IceUtil::Handle<RemoteObserverI> remoteObserver;
+};
+
+class CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver, public IceUtil::Mutex
+{
+public:
+
+ virtual void
+ setObserverUpdater(const Ice::Instrumentation::ObserverUpdaterPtr& u)
+ {
+ updater = u;
+ }
+
+ virtual Ice::Instrumentation::ObserverPtr
+ getConnectionEstablishmentObserver(const Ice::EndpointPtr&, const std::string&)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ if(!connectionEstablishmentObserver)
+ {
+ connectionEstablishmentObserver = new ObserverI();
+ connectionEstablishmentObserver->reset();
+ }
+ return connectionEstablishmentObserver;
+ }
+
+
+ virtual Ice::Instrumentation::ObserverPtr
+ getEndpointLookupObserver(const Ice::EndpointPtr&)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ if(!endpointLookupObserver)
+ {
+ endpointLookupObserver = new ObserverI();
+ endpointLookupObserver->reset();
+ }
+ return endpointLookupObserver;
+ }
+
+ virtual Ice::Instrumentation::ConnectionObserverPtr
+ getConnectionObserver(const Ice::ConnectionInfoPtr&,
+ const Ice::EndpointPtr&,
+ Ice::Instrumentation::ConnectionState,
+ const Ice::Instrumentation::ConnectionObserverPtr& old)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ test(!old || dynamic_cast<ConnectionObserverI*>(old.get()));
+ if(!connectionObserver)
+ {
+ connectionObserver = new ConnectionObserverI();
+ connectionObserver->reset();
+ }
+ return connectionObserver;
+ }
+
+ virtual Ice::Instrumentation::ThreadObserverPtr
+ getThreadObserver(const std::string&, const std::string&, Ice::Instrumentation::ThreadState,
+ const Ice::Instrumentation::ThreadObserverPtr& old)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ test(!old || dynamic_cast<ThreadObserverI*>(old.get()));
+ if(!threadObserver)
+ {
+ threadObserver = new ThreadObserverI();
+ threadObserver->reset();
+ }
+ return threadObserver;
+ }
+
+ virtual Ice::Instrumentation::InvocationObserverPtr
+ getInvocationObserver(const Ice::ObjectPrx&, const std::string&, const Ice::Context&)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ if(!invocationObserver)
+ {
+ invocationObserver = new InvocationObserverI();
+ invocationObserver->reset();
+ }
+ return invocationObserver;
+ }
+
+ virtual Ice::Instrumentation::DispatchObserverPtr
+ getDispatchObserver(const Ice::Current&, Ice::Int)
+ {
+ IceUtil::Mutex::Lock sync(*this);
+ if(!dispatchObserver)
+ {
+ dispatchObserver = new DispatchObserverI();
+ dispatchObserver->reset();
+ }
+ return dispatchObserver;
+ }
+
+ void reset()
+ {
+ if(connectionEstablishmentObserver)
+ {
+ connectionEstablishmentObserver->reset();
+ }
+ if(endpointLookupObserver)
+ {
+ endpointLookupObserver->reset();
+ }
+ if(connectionObserver)
+ {
+ connectionObserver->reset();
+ }
+ if(threadObserver)
+ {
+ threadObserver->reset();
+ }
+ if(invocationObserver)
+ {
+ invocationObserver->reset();
+ }
+ if(dispatchObserver)
+ {
+ dispatchObserver->reset();
+ }
+ }
+
+ Ice::Instrumentation::ObserverUpdaterPtr updater;
+
+ IceUtil::Handle<ObserverI> connectionEstablishmentObserver;
+ IceUtil::Handle<ObserverI> endpointLookupObserver;
+ IceUtil::Handle<ConnectionObserverI> connectionObserver;
+ IceUtil::Handle<ThreadObserverI> threadObserver;
+ IceUtil::Handle<InvocationObserverI> invocationObserver;
+ IceUtil::Handle<DispatchObserverI> dispatchObserver;
+};
+
+typedef IceUtil::Handle<CommunicatorObserverI> CommunicatorObserverIPtr;
+
+#endif