diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/include/Ice/MetricsObserverI.h | 32 | ||||
-rw-r--r-- | cpp/src/Ice/CommunicatorI.cpp | 2 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 4 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 20 | ||||
-rw-r--r-- | cpp/src/Ice/EndpointI.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Ice/Incoming.cpp | 2 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.cpp | 28 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.h | 6 | ||||
-rw-r--r-- | cpp/src/Ice/InstrumentationI.cpp | 120 | ||||
-rw-r--r-- | cpp/src/Ice/InstrumentationI.h | 146 | ||||
-rw-r--r-- | cpp/src/Ice/ObserverHelper.cpp | 8 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 2 | ||||
-rw-r--r-- | cpp/test/Ice/metrics/AllTests.cpp | 37 | ||||
-rw-r--r-- | cpp/test/Ice/metrics/Client.cpp | 11 | ||||
-rw-r--r-- | cpp/test/Ice/metrics/InstrumentationI.h | 334 |
15 files changed, 680 insertions, 78 deletions
diff --git a/cpp/include/Ice/MetricsObserverI.h b/cpp/include/Ice/MetricsObserverI.h index 797e9fcbf83..1ea304df7c3 100644 --- a/cpp/include/Ice/MetricsObserverI.h +++ b/cpp/include/Ice/MetricsObserverI.h @@ -430,10 +430,6 @@ public: _metrics->registerMap<MetricsType>(name, this); } - ObserverFactoryT(const std::string& name) : _name(name) - { - } - ~ObserverFactoryT() { if(_metrics) @@ -446,6 +442,10 @@ public: getObserver(const MetricsHelperT<MetricsType>& helper) { IceUtil::Mutex::Lock sync(*this); + if(!_metrics) + { + return 0; + } typename ObserverImplType::EntrySeqType metricsObjects; for(typename MetricsMapSeqType::const_iterator p = _maps.begin(); p != _maps.end(); ++p) @@ -470,13 +470,18 @@ public: template<typename ObserverPtrType> ObserverImplPtrType getObserver(const MetricsHelperT<MetricsType>& helper, const ObserverPtrType& observer) { - if(!observer) + ObserverImplPtrType old = ObserverImplPtrType::dynamicCast(observer); + if(!observer || !old) { return getObserver(helper); } IceUtil::Mutex::Lock sync(*this); - ObserverImplPtrType old = ObserverImplPtrType::dynamicCast(observer); + if(!_metrics) + { + return 0; + } + typename ObserverImplType::EntrySeqType metricsObjects; for(typename MetricsMapSeqType::const_iterator p = _maps.begin(); p != _maps.end(); ++p) { @@ -501,6 +506,7 @@ public: template<typename SubMapMetricsType> void registerSubMap(const std::string& subMap, MetricsMap MetricsType::* member) { + assert(_metrics); _metrics->registerSubMap<SubMapMetricsType>(_name, subMap, member); } @@ -514,6 +520,11 @@ public: UpdaterPtr updater; { IceUtil::Mutex::Lock sync(*this); + if(!_metrics) + { + return; + } + std::vector<IceInternal::MetricsMapIPtr> maps = _metrics->getMaps(_name); _maps.clear(); for(std::vector<IceInternal::MetricsMapIPtr>::const_iterator p = maps.begin(); p != maps.end(); ++p) @@ -537,9 +548,16 @@ public: _updater = updater; } + void destroy() + { + IceUtil::Mutex::Lock sync(*this); + _metrics = 0; + _maps.clear(); + } + private: - const IceInternal::MetricsAdminIPtr _metrics; + IceInternal::MetricsAdminIPtr _metrics; const std::string _name; MetricsMapSeqType _maps; volatile bool _enabled; diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp index f3e17c4dd2d..4646479d978 100644 --- a/cpp/src/Ice/CommunicatorI.cpp +++ b/cpp/src/Ice/CommunicatorI.cpp @@ -261,7 +261,7 @@ Ice::CommunicatorI::getStats() const Ice::Instrumentation::CommunicatorObserverPtr Ice::CommunicatorI::getObserver() const { - return _instance->initializationData().observer; + return _instance->getObserver(); } RouterPrx diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 60534489cd9..822e26b84ec 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -225,7 +225,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // Try to establish the connection to the connectors. // DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); - const CommunicatorObserverPtr& obsv = _instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _instance->getObserver(); vector<ConnectorInfo>::const_iterator q; for(q = connectors.begin(); q != connectors.end(); ++q) { @@ -1122,7 +1122,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() try { - const CommunicatorObserverPtr& obsv = _factory->_instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _factory->_instance->getObserver(); if(obsv) { _observer = obsv->getConnectionEstablishmentObserver(_iter->endpoint, _iter->connector->toString()); diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 8091d2f2233..fbf9b2ea5f6 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -525,11 +525,11 @@ Ice::ConnectionI::updateObserver() return; } - assert(_instance->initializationData().observer); - _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), - _endpoint, - toConnectionState(_state), - _observer.get())); + assert(_instance->getObserver()); + _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(), + _endpoint, + toConnectionState(_state), + _observer.get())); } void @@ -2188,16 +2188,16 @@ Ice::ConnectionI::setState(State state) } } - if(_instance->initializationData().observer) + if(_instance->getObserver()) { ConnectionState oldState = toConnectionState(_state); ConnectionState newState = toConnectionState(state); if(oldState != newState) { - _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), - _endpoint, - newState, - _observer.get())); + _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(), + _endpoint, + newState, + _observer.get())); } if(_observer && state == StateClosed && _exception.get()) { diff --git a/cpp/src/Ice/EndpointI.cpp b/cpp/src/Ice/EndpointI.cpp index a27c4082faf..7d70754021b 100644 --- a/cpp/src/Ice/EndpointI.cpp +++ b/cpp/src/Ice/EndpointI.cpp @@ -139,7 +139,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En } ObserverHelperT<> observer; - const CommunicatorObserverPtr& obsv = _instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _instance->getObserver(); if(obsv) { observer.attach(obsv->getEndpointLookupObserver(endpoint)); @@ -201,7 +201,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En entry.endpoint = endpoint; entry.callback = callback; - const CommunicatorObserverPtr& obsv = _instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _instance->getObserver(); if(obsv) { entry.observer = obsv->getEndpointLookupObserver(endpoint); @@ -310,7 +310,7 @@ void IceInternal::EndpointHostResolver::updateObserver() { Lock sync(*this); - const CommunicatorObserverPtr& obsv = _instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _instance->getObserver(); if(obsv) { _observer.attach(obsv->getThreadObserver("Communicator", name(), ThreadStateIdle, _observer.get())); diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp index 928c3c50088..e516940c84f 100644 --- a/cpp/src/Ice/Incoming.cpp +++ b/cpp/src/Ice/Incoming.cpp @@ -592,7 +592,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre _current.ctx.insert(_current.ctx.end(), pr); } - const CommunicatorObserverPtr& obsv = _is->instance()->initializationData().observer; + const CommunicatorObserverPtr& obsv = _is->instance()->getObserver(); if(obsv) { // Read the parameter encapsulation size. diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index f5a61cb9995..19fa9701352 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -1148,18 +1148,20 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi // Setup the communicator observer only if the user didn't already set an // Ice observer resolver and if the admininistrative endpoints are set. // - if(!_initData.observer && - (_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end()) && + if((_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end()) && _initData.properties->getProperty("Ice.Admin.Endpoints") != "") { - CommunicatorObserverIPtr observer = new CommunicatorObserverI(_metricsAdmin); - _initData.observer = observer; + _observer = new CommunicatorObserverI(_metricsAdmin, _initData.observer); // // Make sure the admin plugin receives property updates. // props->addUpdateCallback(_metricsAdmin); } + else + { + _observer = _initData.observer; + } __setNoDelete(false); } @@ -1229,10 +1231,10 @@ IceInternal::Instance::finishSetup(int& argc, char* argv[]) // // Set observer updater // - if(_initData.observer) + if(_observer) { - theCollector->updateObserver(_initData.observer); - _initData.observer->setObserverUpdater(new ObserverUpdaterI(this)); + theCollector->updateObserver(_observer); + _observer->setObserverUpdater(new ObserverUpdaterI(this)); } // @@ -1398,18 +1400,20 @@ IceInternal::Instance::destroy() _retryQueue->destroy(); } - if(_initData.observer && theCollector) + if(_observer && theCollector) { - theCollector->clearObserver(_initData.observer); + theCollector->clearObserver(_observer); } if(_metricsAdmin) { _metricsAdmin->destroy(); _metricsAdmin = 0; - if(CommunicatorObserverIPtr::dynamicCast(_initData.observer)) + + // Break cyclic reference counts. Don't clear _observer, it's immutable. + if(_observer) { - _initData.observer = 0; // Clear cyclic reference counts. + CommunicatorObserverIPtr::dynamicCast(_observer)->destroy(); } } @@ -1567,7 +1571,7 @@ IceInternal::Instance::updateThreadObservers() _endpointHostResolver->updateObserver(); } assert(theCollector); - theCollector->updateObserver(_initData.observer); + theCollector->updateObserver(_observer); } catch(const Ice::CommunicatorDestroyedException&) { diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h index 16aa25648b0..34261196016 100644 --- a/cpp/src/Ice/Instance.h +++ b/cpp/src/Ice/Instance.h @@ -93,6 +93,11 @@ public: void addAdminFacet(const Ice::ObjectPtr&, const std::string&); Ice::ObjectPtr removeAdminFacet(const std::string&); Ice::ObjectPtr findAdminFacet(const std::string&); + + const Ice::Instrumentation::CommunicatorObserverPtr& getObserver() const + { + return _observer; + } const Ice::ImplicitContextIPtr& getImplicitContext() const { @@ -157,6 +162,7 @@ private: Ice::Identity _adminIdentity; std::set<std::string> _adminFacetFilter; IceInternal::MetricsAdminIPtr _metricsAdmin; + Ice::Instrumentation::CommunicatorObserverPtr _observer; }; class ProcessI : public Ice::Process diff --git a/cpp/src/Ice/InstrumentationI.cpp b/cpp/src/Ice/InstrumentationI.cpp index 82598bbf1dc..fcfae4200a7 100644 --- a/cpp/src/Ice/InstrumentationI.cpp +++ b/cpp/src/Ice/InstrumentationI.cpp @@ -694,48 +694,81 @@ void ConnectionObserverI::sentBytes(Int num) { forEach(add(&ConnectionMetrics::sentBytes, num)); + if(_delegate) + { + _delegate->sentBytes(num); + } } void ConnectionObserverI::receivedBytes(Int num) { forEach(add(&ConnectionMetrics::receivedBytes, num)); + if(_delegate) + { + _delegate->receivedBytes(num); + } } void ThreadObserverI::stateChanged(ThreadState oldState, ThreadState newState) { forEach(ThreadStateChanged(oldState, newState)); + if(_delegate) + { + _delegate->stateChanged(oldState, newState); + } + } void DispatchObserverI::userException() { forEach(inc(&DispatchMetrics::userException)); + if(_delegate) + { + _delegate->userException(); + } } void DispatchObserverI::reply(Int size) { forEach(add(&DispatchMetrics::replySize, size)); + if(_delegate) + { + _delegate->reply(size); + } } void RemoteObserverI::reply(Int size) { forEach(add(&RemoteMetrics::replySize, size)); + if(_delegate) + { + _delegate->reply(size); + } } void InvocationObserverI::retried() { forEach(inc(&InvocationMetrics::retry)); + if(_delegate) + { + _delegate->retried(); + } } void InvocationObserverI::userException() { forEach(inc(&InvocationMetrics::userException)); + if(_delegate) + { + _delegate->userException(); + } } RemoteObserverPtr @@ -746,7 +779,14 @@ InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection, { try { - return getObserver<RemoteObserverI>("Remote", RemoteInvocationHelper(connection, endpoint, requestId, size)); + RemoteObserverPtr delegate; + if(_delegate) + { + delegate = _delegate->getRemoteObserver(connection, endpoint, requestId, size); + } + return getObserver<RemoteObserverI>("Remote", + RemoteInvocationHelper(connection, endpoint, requestId, size), + delegate); } catch(const exception&) { @@ -754,8 +794,11 @@ InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection, return 0; } -CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr& metrics) : +CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr& metrics, + const Ice::Instrumentation::CommunicatorObserverPtr& delegate) : _metrics(metrics), + _logger(metrics->getLogger()), + _delegate(delegate), _connections(metrics, "Connection"), _dispatch(metrics, "Dispatch"), _invocations(metrics, "Invocation"), @@ -771,6 +814,10 @@ CommunicatorObserverI::setObserverUpdater(const ObserverUpdaterPtr& updater) { _connections.setUpdater(newUpdater(updater, &ObserverUpdater::updateConnectionObservers)); _threads.setUpdater(newUpdater(updater, &ObserverUpdater::updateThreadObservers)); + if(_delegate) + { + _delegate->setObserverUpdater(updater); + } } ObserverPtr @@ -780,11 +827,16 @@ CommunicatorObserverI::getConnectionEstablishmentObserver(const EndpointPtr& end { try { - return _connects.getObserver(EndpointHelper(endpt, connector)); + ObserverPtr delegate; + if(_delegate) + { + delegate = _delegate->getConnectionEstablishmentObserver(endpt, connector); + } + return _connects.getObserver(EndpointHelper(endpt, connector), delegate); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -798,11 +850,16 @@ CommunicatorObserverI::getEndpointLookupObserver(const EndpointPtr& endpt) { try { - return _endpointLookups.getObserver(EndpointHelper(endpt)); + ObserverPtr delegate; + if(_delegate) + { + delegate = _delegate->getEndpointLookupObserver(endpt); + } + return _endpointLookups.getObserver(EndpointHelper(endpt), delegate); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -819,11 +876,17 @@ CommunicatorObserverI::getConnectionObserver(const ConnectionInfoPtr& con, { try { - return _connections.getObserver(ConnectionHelper(con, endpt, state), observer); + ConnectionObserverPtr delegate; + ConnectionObserverI* o = dynamic_cast<ConnectionObserverI*>(observer.get()); + if(_delegate) + { + delegate = _delegate->getConnectionObserver(con, endpt, state, o ? o->getDelegate() : observer); + } + return _connections.getObserver(ConnectionHelper(con, endpt, state), delegate, observer); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -840,11 +903,17 @@ CommunicatorObserverI::getThreadObserver(const string& parent, { try { - return _threads.getObserver(ThreadHelper(parent, id, state), observer); + ThreadObserverPtr delegate; + ThreadObserverI* o = dynamic_cast<ThreadObserverI*>(observer.get()); + if(_delegate) + { + delegate = _delegate->getThreadObserver(parent, id, state, o ? o->getDelegate() : observer); + } + return _threads.getObserver(ThreadHelper(parent, id, state), delegate, observer); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -858,11 +927,16 @@ CommunicatorObserverI::getInvocationObserver(const ObjectPrx& proxy, const strin { try { - return _invocations.getObserver(InvocationHelper(proxy, op, ctx)); + InvocationObserverPtr delegate; + if(_delegate) + { + delegate = _delegate->getInvocationObserver(proxy, op, ctx); + } + return _invocations.getObserver(InvocationHelper(proxy, op, ctx), delegate); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -876,11 +950,16 @@ CommunicatorObserverI::getDispatchObserver(const Current& current, int size) { try { - return _dispatch.getObserver(DispatchHelper(current, size)); + DispatchObserverPtr delegate; + if(_delegate) + { + delegate = _delegate->getDispatchObserver(current, size); + } + return _dispatch.getObserver(DispatchHelper(current, size), delegate); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -890,5 +969,18 @@ CommunicatorObserverI::getDispatchObserver(const Current& current, int size) const IceInternal::MetricsAdminIPtr& CommunicatorObserverI::getMetricsAdmin() const { + assert(_metrics); return _metrics; } + +void +CommunicatorObserverI::destroy() +{ + _metrics = 0; + _connections.destroy(); + _dispatch.destroy(); + _invocations.destroy(); + _threads.destroy(); + _connects.destroy(); + _endpointLookups.destroy(); +} diff --git a/cpp/src/Ice/InstrumentationI.h b/cpp/src/Ice/InstrumentationI.h index 9f6b61327b9..1db093feae2 100644 --- a/cpp/src/Ice/InstrumentationI.h +++ b/cpp/src/Ice/InstrumentationI.h @@ -16,6 +16,109 @@ namespace IceInternal { +template<typename T, typename O> class ObserverWithDelegateT : public IceMX::ObserverT<T>, virtual public O +{ +public: + + typedef O ObserverType; + typedef typename IceInternal::Handle<O> ObserverPtrType; + + virtual void + attach() + { + IceMX::ObserverT<T>::attach(); + if(_delegate) + { + _delegate->attach(); + } + } + + virtual void + detach() + { + IceMX::ObserverT<T>::detach(); + if(_delegate) + { + _delegate->detach(); + } + } + + virtual void + failed(const std::string& exceptionName) + { + IceMX::ObserverT<T>::failed(exceptionName); + if(_delegate) + { + _delegate->failed(exceptionName); + } + } + + ObserverPtrType + getDelegate() const + { + return _delegate; + } + + void + setDelegate(ObserverPtrType delegate) + { + _delegate = delegate; + } + + template<typename ObserverImpl, typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType + getObserver(const std::string& mapName, const IceMX::MetricsHelperT<ObserverMetricsType>& helper, + const ObserverPtrType& del) + { + IceInternal::Handle<ObserverImpl> obsv = IceMX::ObserverT<T>::template getObserver<ObserverImpl>(mapName, + helper); + if(obsv) + { + obsv->setDelegate(del); + return obsv; + } + return del; + } + +protected: + + ObserverPtrType _delegate; +}; + +template<typename T> class ObserverFactoryWithDelegateT : public IceMX::ObserverFactoryT<T> +{ +public: + + ObserverFactoryWithDelegateT(const IceInternal::MetricsAdminIPtr& metrics, const std::string& name) : + IceMX::ObserverFactoryT<T>(metrics, name) + { + } + + template<typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType + getObserver(const IceMX::MetricsHelperT<ObserverMetricsType>& helper, const ObserverPtrType& del) + { + IceInternal::Handle<T> obsv = IceMX::ObserverFactoryT<T>::getObserver(helper); + if(obsv) + { + obsv->setDelegate(del); + return obsv; + } + return del; + } + + template<typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType + getObserver(const IceMX::MetricsHelperT<ObserverMetricsType>& helper, const ObserverPtrType& del, + const ObserverPtrType& old) + { + IceInternal::Handle<T> obsv = IceMX::ObserverFactoryT<T>::getObserver(helper, old); + if(obsv) + { + obsv->setDelegate(del); + return obsv; + } + return del; + } +}; + template<typename Helper> void addEndpointAttributes(typename Helper::Attributes& attrs) { @@ -49,8 +152,8 @@ void addConnectionAttributes(typename Helper::Attributes& attrs) addEndpointAttributes<Helper>(attrs); } -class ConnectionObserverI : public Ice::Instrumentation::ConnectionObserver, - public IceMX::ObserverT<IceMX::ConnectionMetrics> +class ConnectionObserverI : public ObserverWithDelegateT<IceMX::ConnectionMetrics, + Ice::Instrumentation::ConnectionObserver> { public: @@ -58,16 +161,14 @@ public: virtual void receivedBytes(Ice::Int); }; -class ThreadObserverI : public Ice::Instrumentation::ThreadObserver, - public IceMX::ObserverT<IceMX::ThreadMetrics> +class ThreadObserverI : public ObserverWithDelegateT<IceMX::ThreadMetrics, Ice::Instrumentation::ThreadObserver> { public: virtual void stateChanged(Ice::Instrumentation::ThreadState, Ice::Instrumentation::ThreadState); }; -class DispatchObserverI : public Ice::Instrumentation::DispatchObserver, - public IceMX::ObserverT<IceMX::DispatchMetrics> +class DispatchObserverI : public ObserverWithDelegateT<IceMX::DispatchMetrics, Ice::Instrumentation::DispatchObserver> { public: @@ -76,14 +177,15 @@ public: virtual void reply(Ice::Int); }; -class RemoteObserverI : public Ice::Instrumentation::RemoteObserver, - public IceMX::ObserverT<IceMX::RemoteMetrics> +class RemoteObserverI : public ObserverWithDelegateT<IceMX::RemoteMetrics, Ice::Instrumentation::RemoteObserver> { +public: + virtual void reply(Ice::Int); }; -class InvocationObserverI : public Ice::Instrumentation::InvocationObserver, - public IceMX::ObserverT<IceMX::InvocationMetrics> +class InvocationObserverI : public ObserverWithDelegateT<IceMX::InvocationMetrics, + Ice::Instrumentation::InvocationObserver> { public: @@ -95,11 +197,15 @@ public: const Ice::EndpointPtr&, Ice::Int, Ice::Int); }; +typedef ObserverWithDelegateT<IceMX::Metrics, Ice::Instrumentation::Observer> ObserverI; + class ICE_API CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver { public: - CommunicatorObserverI(const IceInternal::MetricsAdminIPtr&); + CommunicatorObserverI(const IceInternal::MetricsAdminIPtr&, + const Ice::Instrumentation::CommunicatorObserverPtr& = + Ice::Instrumentation::CommunicatorObserverPtr()); virtual void setObserverUpdater(const Ice::Instrumentation::ObserverUpdaterPtr&); @@ -126,16 +232,20 @@ public: const IceInternal::MetricsAdminIPtr& getMetricsAdmin() const; + void destroy(); + private: - const IceInternal::MetricsAdminIPtr _metrics; + IceInternal::MetricsAdminIPtr _metrics; + Ice::LoggerPtr _logger; + const Ice::Instrumentation::CommunicatorObserverPtr _delegate; - IceMX::ObserverFactoryT<ConnectionObserverI> _connections; - IceMX::ObserverFactoryT<DispatchObserverI> _dispatch; - IceMX::ObserverFactoryT<InvocationObserverI> _invocations; - IceMX::ObserverFactoryT<ThreadObserverI> _threads; - IceMX::ObserverFactoryT<IceMX::ObserverI> _connects; - IceMX::ObserverFactoryT<IceMX::ObserverI> _endpointLookups; + ObserverFactoryWithDelegateT<ConnectionObserverI> _connections; + ObserverFactoryWithDelegateT<DispatchObserverI> _dispatch; + ObserverFactoryWithDelegateT<InvocationObserverI> _invocations; + ObserverFactoryWithDelegateT<ThreadObserverI> _threads; + ObserverFactoryWithDelegateT<ObserverI> _connects; + ObserverFactoryWithDelegateT<ObserverI> _endpointLookups; }; typedef IceUtil::Handle<CommunicatorObserverI> CommunicatorObserverIPtr; diff --git a/cpp/src/Ice/ObserverHelper.cpp b/cpp/src/Ice/ObserverHelper.cpp index ca148bff6da..4215d77eabd 100644 --- a/cpp/src/Ice/ObserverHelper.cpp +++ b/cpp/src/Ice/ObserverHelper.cpp @@ -25,7 +25,7 @@ Ice::Context emptyCtx; IceInternal::InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx) { - const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer; + const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->getObserver(); if(!obsv) { return; @@ -43,7 +43,7 @@ IceInternal::InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy IceInternal::InvocationObserver::InvocationObserver(IceInternal::Instance* instance, const string& op) { - const CommunicatorObserverPtr& obsv = instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = instance->getObserver(); if(!obsv) { return; @@ -55,7 +55,7 @@ IceInternal::InvocationObserver::InvocationObserver(IceInternal::Instance* insta void IceInternal::InvocationObserver::attach(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx) { - const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer; + const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->getObserver(); if(!obsv) { return; @@ -74,7 +74,7 @@ IceInternal::InvocationObserver::attach(IceProxy::Ice::Object* proxy, const stri void IceInternal::InvocationObserver::attach(IceInternal::Instance* instance, const string& op) { - const CommunicatorObserverPtr& obsv = instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = instance->getObserver(); if(!obsv) { return; diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 62652c786d2..2234e8c7ceb 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -1128,7 +1128,7 @@ void IceInternal::ThreadPool::EventHandlerThread::updateObserver() { // Must be called with the thread pool mutex locked - const CommunicatorObserverPtr& obsv = _pool->_instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _pool->_instance->getObserver(); if(obsv) { _observer.attach(obsv->getThreadObserver(_pool->_prefix, name(), _state, _observer.get())); diff --git a/cpp/test/Ice/metrics/AllTests.cpp b/cpp/test/Ice/metrics/AllTests.cpp index a181f43fbb1..ea097a349ac 100644 --- a/cpp/test/Ice/metrics/AllTests.cpp +++ b/cpp/test/Ice/metrics/AllTests.cpp @@ -9,6 +9,7 @@ #include <Ice/Ice.h> #include <TestCommon.h> +#include <InstrumentationI.h> #include <Test.h> using namespace std; @@ -368,7 +369,7 @@ toMap(const IceMX::MetricsMap& mmap) } MetricsPrx -allTests(const Ice::CommunicatorPtr& communicator) +allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPtr& obsv) { MetricsPrx metrics = MetricsPrx::checkedCast(communicator->stringToProxy("metrics:default -p 12010")); @@ -1047,5 +1048,39 @@ allTests(const Ice::CommunicatorPtr& communicator) cout << "ok" << endl; + cout << "testing instrumentation observer delegate... " << flush; + + test(obsv->threadObserver->total > 0); + test(obsv->connectionObserver->total > 0); + test(obsv->connectionEstablishmentObserver->total > 0); + test(obsv->endpointLookupObserver->total > 0); + test(obsv->dispatchObserver->total > 0); + test(obsv->invocationObserver->total > 0); + test(obsv->invocationObserver->remoteObserver->total > 0); + + test(obsv->threadObserver->current > 0); + test(obsv->connectionObserver->current > 0); + test(obsv->connectionEstablishmentObserver->current == 0); + test(obsv->endpointLookupObserver->current == 0); + test(obsv->dispatchObserver->current == 0); + test(obsv->invocationObserver->current == 0); + test(obsv->invocationObserver->remoteObserver->current == 0); + + test(obsv->threadObserver->failedCount == 0); + test(obsv->connectionObserver->failedCount > 0); + test(obsv->connectionEstablishmentObserver->failedCount > 0); + test(obsv->endpointLookupObserver->failedCount > 0); + //test(obsv->dispatchObserver->failedCount > 0); + test(obsv->invocationObserver->failedCount > 0); + test(obsv->invocationObserver->remoteObserver->failedCount > 0); + + test(obsv->threadObserver->states > 0); + test(obsv->connectionObserver->received > 0 && obsv->connectionObserver->sent > 0); + //test(obsv->dispatchObserver->userExceptionCount > 0); + test(obsv->invocationObserver->userExceptionCount > 0 && obsv->invocationObserver->retriedCount > 0); + test(obsv->invocationObserver->remoteObserver->replySize > 0); + + cout << "ok" << endl; + return metrics; } diff --git a/cpp/test/Ice/metrics/Client.cpp b/cpp/test/Ice/metrics/Client.cpp index f30ad6032ad..4f541c2f7cc 100644 --- a/cpp/test/Ice/metrics/Client.cpp +++ b/cpp/test/Ice/metrics/Client.cpp @@ -10,6 +10,7 @@ #include <Ice/Ice.h> #include <TestCommon.h> #include <Test.h> +#include <InstrumentationI.h> DEFINE_TEST("client") @@ -17,10 +18,10 @@ using namespace std; using namespace Test; int -run(int, char**, const Ice::CommunicatorPtr& communicator) +run(int, char**, const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPtr& observer) { - MetricsPrx allTests(const Ice::CommunicatorPtr&); - MetricsPrx metrics = allTests(communicator); + MetricsPrx allTests(const Ice::CommunicatorPtr&, const CommunicatorObserverIPtr&); + MetricsPrx metrics = allTests(communicator, observer); metrics->shutdown(); return EXIT_SUCCESS; } @@ -40,8 +41,10 @@ main(int argc, char* argv[]) initData.properties->setProperty("Ice.Admin.DelayCreation", "1"); initData.properties->setProperty("Ice.Warn.Connections", "0"); initData.properties->setProperty("Ice.MessageSizeMax", "50000"); + CommunicatorObserverIPtr observer = new CommunicatorObserverI(); + initData.observer = observer; communicator = Ice::initialize(argc, argv, initData); - status = run(argc, argv, communicator); + status = run(argc, argv, communicator, observer); } catch(const Ice::Exception& ex) { diff --git a/cpp/test/Ice/metrics/InstrumentationI.h b/cpp/test/Ice/metrics/InstrumentationI.h new file mode 100644 index 00000000000..92ff7e1d11d --- /dev/null +++ b/cpp/test/Ice/metrics/InstrumentationI.h @@ -0,0 +1,334 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef INSTRUMENTATION_I_H +#define INSTRUMENTATION_I_H + +#include <Ice/Instrumentation.h> + +class ObserverI : virtual public Ice::Instrumentation::Observer, public IceUtil::Mutex +{ +public: + + virtual void + reset() + { + total = 0; + current = 0; + failedCount = 0; + } + + virtual void + attach() + { + IceUtil::Mutex::Lock sync(*this); + ++total; + ++current; + } + virtual void + detach() + { + IceUtil::Mutex::Lock sync(*this); + --current; + } + virtual void + failed(const std::string&) + { + IceUtil::Mutex::Lock sync(*this); + ++failedCount; + } + + Ice::Int total; + Ice::Int current; + Ice::Int failedCount; +}; + +class ConnectionObserverI : public Ice::Instrumentation::ConnectionObserver, public ObserverI +{ +public: + + virtual void + reset() + { + IceUtil::Mutex::Lock sync(*this); + ObserverI::reset(); + received = 0; + sent = 0; + } + + virtual void + sentBytes(Ice::Int s) + { + IceUtil::Mutex::Lock sync(*this); + sent += s; + } + + virtual void + receivedBytes(Ice::Int s) + { + IceUtil::Mutex::Lock sync(*this); + received += s; + } + + Ice::Int sent; + Ice::Int received; +}; + +class ThreadObserverI : public Ice::Instrumentation::ThreadObserver, public ObserverI +{ +public: + + virtual void + reset() + { + IceUtil::Mutex::Lock sync(*this); + ObserverI::reset(); + states = 0; + } + + virtual void + stateChanged(Ice::Instrumentation::ThreadState, Ice::Instrumentation::ThreadState) + { + IceUtil::Mutex::Lock sync(*this); + ++states; + } + + Ice::Int states; +}; + +class DispatchObserverI : public Ice::Instrumentation::DispatchObserver, public ObserverI +{ +public: + + virtual void reset() + { + IceUtil::Mutex::Lock sync(*this); + ObserverI::reset(); + userExceptionCount = 0; + replySize = 0; + } + + virtual void + userException() + { + IceUtil::Mutex::Lock sync(*this); + ++userExceptionCount; + } + + virtual void + reply(Ice::Int s) + { + IceUtil::Mutex::Lock sync(*this); + replySize += s; + } + + Ice::Int userExceptionCount; + Ice::Int replySize; +}; + +class RemoteObserverI : public Ice::Instrumentation::RemoteObserver, public ObserverI +{ +public: + + virtual void + reset() + { + IceUtil::Mutex::Lock sync(*this); + ObserverI::reset(); + replySize = 0; + } + + virtual void + reply(Ice::Int s) + { + IceUtil::Mutex::Lock sync(*this); + replySize += s; + } + + Ice::Int replySize; +}; + +class InvocationObserverI : public Ice::Instrumentation::InvocationObserver, public ObserverI +{ +public: + + virtual void reset() + { + IceUtil::Mutex::Lock sync(*this); + ObserverI::reset(); + retriedCount = 0; + userExceptionCount = 0; + if(remoteObserver) + { + remoteObserver->reset(); + } + } + + virtual void + retried() + { + IceUtil::Mutex::Lock sync(*this); + ++retriedCount; + } + + virtual void + userException() + { + IceUtil::Mutex::Lock sync(*this); + ++userExceptionCount; + } + + virtual Ice::Instrumentation::RemoteObserverPtr + getRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& e, Ice::Int, Ice::Int) + { + IceUtil::Mutex::Lock sync(*this); + if(!remoteObserver) + { + remoteObserver = new RemoteObserverI(); + remoteObserver->reset(); + } + return remoteObserver; + } + + Ice::Int userExceptionCount; + Ice::Int retriedCount; + + IceUtil::Handle<RemoteObserverI> remoteObserver; +}; + +class CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver, public IceUtil::Mutex +{ +public: + + virtual void + setObserverUpdater(const Ice::Instrumentation::ObserverUpdaterPtr& u) + { + updater = u; + } + + virtual Ice::Instrumentation::ObserverPtr + getConnectionEstablishmentObserver(const Ice::EndpointPtr&, const std::string&) + { + IceUtil::Mutex::Lock sync(*this); + if(!connectionEstablishmentObserver) + { + connectionEstablishmentObserver = new ObserverI(); + connectionEstablishmentObserver->reset(); + } + return connectionEstablishmentObserver; + } + + + virtual Ice::Instrumentation::ObserverPtr + getEndpointLookupObserver(const Ice::EndpointPtr&) + { + IceUtil::Mutex::Lock sync(*this); + if(!endpointLookupObserver) + { + endpointLookupObserver = new ObserverI(); + endpointLookupObserver->reset(); + } + return endpointLookupObserver; + } + + virtual Ice::Instrumentation::ConnectionObserverPtr + getConnectionObserver(const Ice::ConnectionInfoPtr&, + const Ice::EndpointPtr&, + Ice::Instrumentation::ConnectionState, + const Ice::Instrumentation::ConnectionObserverPtr& old) + { + IceUtil::Mutex::Lock sync(*this); + test(!old || dynamic_cast<ConnectionObserverI*>(old.get())); + if(!connectionObserver) + { + connectionObserver = new ConnectionObserverI(); + connectionObserver->reset(); + } + return connectionObserver; + } + + virtual Ice::Instrumentation::ThreadObserverPtr + getThreadObserver(const std::string&, const std::string&, Ice::Instrumentation::ThreadState, + const Ice::Instrumentation::ThreadObserverPtr& old) + { + IceUtil::Mutex::Lock sync(*this); + test(!old || dynamic_cast<ThreadObserverI*>(old.get())); + if(!threadObserver) + { + threadObserver = new ThreadObserverI(); + threadObserver->reset(); + } + return threadObserver; + } + + virtual Ice::Instrumentation::InvocationObserverPtr + getInvocationObserver(const Ice::ObjectPrx&, const std::string&, const Ice::Context&) + { + IceUtil::Mutex::Lock sync(*this); + if(!invocationObserver) + { + invocationObserver = new InvocationObserverI(); + invocationObserver->reset(); + } + return invocationObserver; + } + + virtual Ice::Instrumentation::DispatchObserverPtr + getDispatchObserver(const Ice::Current&, Ice::Int) + { + IceUtil::Mutex::Lock sync(*this); + if(!dispatchObserver) + { + dispatchObserver = new DispatchObserverI(); + dispatchObserver->reset(); + } + return dispatchObserver; + } + + void reset() + { + if(connectionEstablishmentObserver) + { + connectionEstablishmentObserver->reset(); + } + if(endpointLookupObserver) + { + endpointLookupObserver->reset(); + } + if(connectionObserver) + { + connectionObserver->reset(); + } + if(threadObserver) + { + threadObserver->reset(); + } + if(invocationObserver) + { + invocationObserver->reset(); + } + if(dispatchObserver) + { + dispatchObserver->reset(); + } + } + + Ice::Instrumentation::ObserverUpdaterPtr updater; + + IceUtil::Handle<ObserverI> connectionEstablishmentObserver; + IceUtil::Handle<ObserverI> endpointLookupObserver; + IceUtil::Handle<ConnectionObserverI> connectionObserver; + IceUtil::Handle<ThreadObserverI> threadObserver; + IceUtil::Handle<InvocationObserverI> invocationObserver; + IceUtil::Handle<DispatchObserverI> dispatchObserver; +}; + +typedef IceUtil::Handle<CommunicatorObserverI> CommunicatorObserverIPtr; + +#endif |