diff options
60 files changed, 2174 insertions, 227 deletions
diff --git a/cpp/include/Ice/MetricsObserverI.h b/cpp/include/Ice/MetricsObserverI.h index 797e9fcbf83..1ea304df7c3 100644 --- a/cpp/include/Ice/MetricsObserverI.h +++ b/cpp/include/Ice/MetricsObserverI.h @@ -430,10 +430,6 @@ public: _metrics->registerMap<MetricsType>(name, this); } - ObserverFactoryT(const std::string& name) : _name(name) - { - } - ~ObserverFactoryT() { if(_metrics) @@ -446,6 +442,10 @@ public: getObserver(const MetricsHelperT<MetricsType>& helper) { IceUtil::Mutex::Lock sync(*this); + if(!_metrics) + { + return 0; + } typename ObserverImplType::EntrySeqType metricsObjects; for(typename MetricsMapSeqType::const_iterator p = _maps.begin(); p != _maps.end(); ++p) @@ -470,13 +470,18 @@ public: template<typename ObserverPtrType> ObserverImplPtrType getObserver(const MetricsHelperT<MetricsType>& helper, const ObserverPtrType& observer) { - if(!observer) + ObserverImplPtrType old = ObserverImplPtrType::dynamicCast(observer); + if(!observer || !old) { return getObserver(helper); } IceUtil::Mutex::Lock sync(*this); - ObserverImplPtrType old = ObserverImplPtrType::dynamicCast(observer); + if(!_metrics) + { + return 0; + } + typename ObserverImplType::EntrySeqType metricsObjects; for(typename MetricsMapSeqType::const_iterator p = _maps.begin(); p != _maps.end(); ++p) { @@ -501,6 +506,7 @@ public: template<typename SubMapMetricsType> void registerSubMap(const std::string& subMap, MetricsMap MetricsType::* member) { + assert(_metrics); _metrics->registerSubMap<SubMapMetricsType>(_name, subMap, member); } @@ -514,6 +520,11 @@ public: UpdaterPtr updater; { IceUtil::Mutex::Lock sync(*this); + if(!_metrics) + { + return; + } + std::vector<IceInternal::MetricsMapIPtr> maps = _metrics->getMaps(_name); _maps.clear(); for(std::vector<IceInternal::MetricsMapIPtr>::const_iterator p = maps.begin(); p != maps.end(); ++p) @@ -537,9 +548,16 @@ public: _updater = updater; } + void destroy() + { + IceUtil::Mutex::Lock sync(*this); + _metrics = 0; + _maps.clear(); + } + private: - const IceInternal::MetricsAdminIPtr _metrics; + IceInternal::MetricsAdminIPtr _metrics; const std::string _name; MetricsMapSeqType _maps; volatile bool _enabled; diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp index f3e17c4dd2d..4646479d978 100644 --- a/cpp/src/Ice/CommunicatorI.cpp +++ b/cpp/src/Ice/CommunicatorI.cpp @@ -261,7 +261,7 @@ Ice::CommunicatorI::getStats() const Ice::Instrumentation::CommunicatorObserverPtr Ice::CommunicatorI::getObserver() const { - return _instance->initializationData().observer; + return _instance->getObserver(); } RouterPrx diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index 60534489cd9..822e26b84ec 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -225,7 +225,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt // Try to establish the connection to the connectors. // DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); - const CommunicatorObserverPtr& obsv = _instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _instance->getObserver(); vector<ConnectorInfo>::const_iterator q; for(q = connectors.begin(); q != connectors.end(); ++q) { @@ -1122,7 +1122,7 @@ IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() try { - const CommunicatorObserverPtr& obsv = _factory->_instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _factory->_instance->getObserver(); if(obsv) { _observer = obsv->getConnectionEstablishmentObserver(_iter->endpoint, _iter->connector->toString()); diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 8091d2f2233..fbf9b2ea5f6 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -525,11 +525,11 @@ Ice::ConnectionI::updateObserver() return; } - assert(_instance->initializationData().observer); - _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), - _endpoint, - toConnectionState(_state), - _observer.get())); + assert(_instance->getObserver()); + _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(), + _endpoint, + toConnectionState(_state), + _observer.get())); } void @@ -2188,16 +2188,16 @@ Ice::ConnectionI::setState(State state) } } - if(_instance->initializationData().observer) + if(_instance->getObserver()) { ConnectionState oldState = toConnectionState(_state); ConnectionState newState = toConnectionState(state); if(oldState != newState) { - _observer.attach(_instance->initializationData().observer->getConnectionObserver(initConnectionInfo(), - _endpoint, - newState, - _observer.get())); + _observer.attach(_instance->getObserver()->getConnectionObserver(initConnectionInfo(), + _endpoint, + newState, + _observer.get())); } if(_observer && state == StateClosed && _exception.get()) { diff --git a/cpp/src/Ice/EndpointI.cpp b/cpp/src/Ice/EndpointI.cpp index a27c4082faf..7d70754021b 100644 --- a/cpp/src/Ice/EndpointI.cpp +++ b/cpp/src/Ice/EndpointI.cpp @@ -139,7 +139,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En } ObserverHelperT<> observer; - const CommunicatorObserverPtr& obsv = _instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _instance->getObserver(); if(obsv) { observer.attach(obsv->getEndpointLookupObserver(endpoint)); @@ -201,7 +201,7 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, Ice::En entry.endpoint = endpoint; entry.callback = callback; - const CommunicatorObserverPtr& obsv = _instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _instance->getObserver(); if(obsv) { entry.observer = obsv->getEndpointLookupObserver(endpoint); @@ -310,7 +310,7 @@ void IceInternal::EndpointHostResolver::updateObserver() { Lock sync(*this); - const CommunicatorObserverPtr& obsv = _instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _instance->getObserver(); if(obsv) { _observer.attach(obsv->getThreadObserver("Communicator", name(), ThreadStateIdle, _observer.get())); diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp index 928c3c50088..e516940c84f 100644 --- a/cpp/src/Ice/Incoming.cpp +++ b/cpp/src/Ice/Incoming.cpp @@ -592,7 +592,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre _current.ctx.insert(_current.ctx.end(), pr); } - const CommunicatorObserverPtr& obsv = _is->instance()->initializationData().observer; + const CommunicatorObserverPtr& obsv = _is->instance()->getObserver(); if(obsv) { // Read the parameter encapsulation size. diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index f5a61cb9995..19fa9701352 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -1148,18 +1148,20 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi // Setup the communicator observer only if the user didn't already set an // Ice observer resolver and if the admininistrative endpoints are set. // - if(!_initData.observer && - (_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end()) && + if((_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end()) && _initData.properties->getProperty("Ice.Admin.Endpoints") != "") { - CommunicatorObserverIPtr observer = new CommunicatorObserverI(_metricsAdmin); - _initData.observer = observer; + _observer = new CommunicatorObserverI(_metricsAdmin, _initData.observer); // // Make sure the admin plugin receives property updates. // props->addUpdateCallback(_metricsAdmin); } + else + { + _observer = _initData.observer; + } __setNoDelete(false); } @@ -1229,10 +1231,10 @@ IceInternal::Instance::finishSetup(int& argc, char* argv[]) // // Set observer updater // - if(_initData.observer) + if(_observer) { - theCollector->updateObserver(_initData.observer); - _initData.observer->setObserverUpdater(new ObserverUpdaterI(this)); + theCollector->updateObserver(_observer); + _observer->setObserverUpdater(new ObserverUpdaterI(this)); } // @@ -1398,18 +1400,20 @@ IceInternal::Instance::destroy() _retryQueue->destroy(); } - if(_initData.observer && theCollector) + if(_observer && theCollector) { - theCollector->clearObserver(_initData.observer); + theCollector->clearObserver(_observer); } if(_metricsAdmin) { _metricsAdmin->destroy(); _metricsAdmin = 0; - if(CommunicatorObserverIPtr::dynamicCast(_initData.observer)) + + // Break cyclic reference counts. Don't clear _observer, it's immutable. + if(_observer) { - _initData.observer = 0; // Clear cyclic reference counts. + CommunicatorObserverIPtr::dynamicCast(_observer)->destroy(); } } @@ -1567,7 +1571,7 @@ IceInternal::Instance::updateThreadObservers() _endpointHostResolver->updateObserver(); } assert(theCollector); - theCollector->updateObserver(_initData.observer); + theCollector->updateObserver(_observer); } catch(const Ice::CommunicatorDestroyedException&) { diff --git a/cpp/src/Ice/Instance.h b/cpp/src/Ice/Instance.h index 16aa25648b0..34261196016 100644 --- a/cpp/src/Ice/Instance.h +++ b/cpp/src/Ice/Instance.h @@ -93,6 +93,11 @@ public: void addAdminFacet(const Ice::ObjectPtr&, const std::string&); Ice::ObjectPtr removeAdminFacet(const std::string&); Ice::ObjectPtr findAdminFacet(const std::string&); + + const Ice::Instrumentation::CommunicatorObserverPtr& getObserver() const + { + return _observer; + } const Ice::ImplicitContextIPtr& getImplicitContext() const { @@ -157,6 +162,7 @@ private: Ice::Identity _adminIdentity; std::set<std::string> _adminFacetFilter; IceInternal::MetricsAdminIPtr _metricsAdmin; + Ice::Instrumentation::CommunicatorObserverPtr _observer; }; class ProcessI : public Ice::Process diff --git a/cpp/src/Ice/InstrumentationI.cpp b/cpp/src/Ice/InstrumentationI.cpp index 82598bbf1dc..fcfae4200a7 100644 --- a/cpp/src/Ice/InstrumentationI.cpp +++ b/cpp/src/Ice/InstrumentationI.cpp @@ -694,48 +694,81 @@ void ConnectionObserverI::sentBytes(Int num) { forEach(add(&ConnectionMetrics::sentBytes, num)); + if(_delegate) + { + _delegate->sentBytes(num); + } } void ConnectionObserverI::receivedBytes(Int num) { forEach(add(&ConnectionMetrics::receivedBytes, num)); + if(_delegate) + { + _delegate->receivedBytes(num); + } } void ThreadObserverI::stateChanged(ThreadState oldState, ThreadState newState) { forEach(ThreadStateChanged(oldState, newState)); + if(_delegate) + { + _delegate->stateChanged(oldState, newState); + } + } void DispatchObserverI::userException() { forEach(inc(&DispatchMetrics::userException)); + if(_delegate) + { + _delegate->userException(); + } } void DispatchObserverI::reply(Int size) { forEach(add(&DispatchMetrics::replySize, size)); + if(_delegate) + { + _delegate->reply(size); + } } void RemoteObserverI::reply(Int size) { forEach(add(&RemoteMetrics::replySize, size)); + if(_delegate) + { + _delegate->reply(size); + } } void InvocationObserverI::retried() { forEach(inc(&InvocationMetrics::retry)); + if(_delegate) + { + _delegate->retried(); + } } void InvocationObserverI::userException() { forEach(inc(&InvocationMetrics::userException)); + if(_delegate) + { + _delegate->userException(); + } } RemoteObserverPtr @@ -746,7 +779,14 @@ InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection, { try { - return getObserver<RemoteObserverI>("Remote", RemoteInvocationHelper(connection, endpoint, requestId, size)); + RemoteObserverPtr delegate; + if(_delegate) + { + delegate = _delegate->getRemoteObserver(connection, endpoint, requestId, size); + } + return getObserver<RemoteObserverI>("Remote", + RemoteInvocationHelper(connection, endpoint, requestId, size), + delegate); } catch(const exception&) { @@ -754,8 +794,11 @@ InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection, return 0; } -CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr& metrics) : +CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr& metrics, + const Ice::Instrumentation::CommunicatorObserverPtr& delegate) : _metrics(metrics), + _logger(metrics->getLogger()), + _delegate(delegate), _connections(metrics, "Connection"), _dispatch(metrics, "Dispatch"), _invocations(metrics, "Invocation"), @@ -771,6 +814,10 @@ CommunicatorObserverI::setObserverUpdater(const ObserverUpdaterPtr& updater) { _connections.setUpdater(newUpdater(updater, &ObserverUpdater::updateConnectionObservers)); _threads.setUpdater(newUpdater(updater, &ObserverUpdater::updateThreadObservers)); + if(_delegate) + { + _delegate->setObserverUpdater(updater); + } } ObserverPtr @@ -780,11 +827,16 @@ CommunicatorObserverI::getConnectionEstablishmentObserver(const EndpointPtr& end { try { - return _connects.getObserver(EndpointHelper(endpt, connector)); + ObserverPtr delegate; + if(_delegate) + { + delegate = _delegate->getConnectionEstablishmentObserver(endpt, connector); + } + return _connects.getObserver(EndpointHelper(endpt, connector), delegate); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -798,11 +850,16 @@ CommunicatorObserverI::getEndpointLookupObserver(const EndpointPtr& endpt) { try { - return _endpointLookups.getObserver(EndpointHelper(endpt)); + ObserverPtr delegate; + if(_delegate) + { + delegate = _delegate->getEndpointLookupObserver(endpt); + } + return _endpointLookups.getObserver(EndpointHelper(endpt), delegate); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -819,11 +876,17 @@ CommunicatorObserverI::getConnectionObserver(const ConnectionInfoPtr& con, { try { - return _connections.getObserver(ConnectionHelper(con, endpt, state), observer); + ConnectionObserverPtr delegate; + ConnectionObserverI* o = dynamic_cast<ConnectionObserverI*>(observer.get()); + if(_delegate) + { + delegate = _delegate->getConnectionObserver(con, endpt, state, o ? o->getDelegate() : observer); + } + return _connections.getObserver(ConnectionHelper(con, endpt, state), delegate, observer); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -840,11 +903,17 @@ CommunicatorObserverI::getThreadObserver(const string& parent, { try { - return _threads.getObserver(ThreadHelper(parent, id, state), observer); + ThreadObserverPtr delegate; + ThreadObserverI* o = dynamic_cast<ThreadObserverI*>(observer.get()); + if(_delegate) + { + delegate = _delegate->getThreadObserver(parent, id, state, o ? o->getDelegate() : observer); + } + return _threads.getObserver(ThreadHelper(parent, id, state), delegate, observer); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -858,11 +927,16 @@ CommunicatorObserverI::getInvocationObserver(const ObjectPrx& proxy, const strin { try { - return _invocations.getObserver(InvocationHelper(proxy, op, ctx)); + InvocationObserverPtr delegate; + if(_delegate) + { + delegate = _delegate->getInvocationObserver(proxy, op, ctx); + } + return _invocations.getObserver(InvocationHelper(proxy, op, ctx), delegate); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -876,11 +950,16 @@ CommunicatorObserverI::getDispatchObserver(const Current& current, int size) { try { - return _dispatch.getObserver(DispatchHelper(current, size)); + DispatchObserverPtr delegate; + if(_delegate) + { + delegate = _delegate->getDispatchObserver(current, size); + } + return _dispatch.getObserver(DispatchHelper(current, size), delegate); } catch(const exception& ex) { - Error error(_metrics->getLogger()); + Error error(_logger); error << "unexpected exception trying to obtain observer:\n" << ex; } } @@ -890,5 +969,18 @@ CommunicatorObserverI::getDispatchObserver(const Current& current, int size) const IceInternal::MetricsAdminIPtr& CommunicatorObserverI::getMetricsAdmin() const { + assert(_metrics); return _metrics; } + +void +CommunicatorObserverI::destroy() +{ + _metrics = 0; + _connections.destroy(); + _dispatch.destroy(); + _invocations.destroy(); + _threads.destroy(); + _connects.destroy(); + _endpointLookups.destroy(); +} diff --git a/cpp/src/Ice/InstrumentationI.h b/cpp/src/Ice/InstrumentationI.h index 9f6b61327b9..1db093feae2 100644 --- a/cpp/src/Ice/InstrumentationI.h +++ b/cpp/src/Ice/InstrumentationI.h @@ -16,6 +16,109 @@ namespace IceInternal { +template<typename T, typename O> class ObserverWithDelegateT : public IceMX::ObserverT<T>, virtual public O +{ +public: + + typedef O ObserverType; + typedef typename IceInternal::Handle<O> ObserverPtrType; + + virtual void + attach() + { + IceMX::ObserverT<T>::attach(); + if(_delegate) + { + _delegate->attach(); + } + } + + virtual void + detach() + { + IceMX::ObserverT<T>::detach(); + if(_delegate) + { + _delegate->detach(); + } + } + + virtual void + failed(const std::string& exceptionName) + { + IceMX::ObserverT<T>::failed(exceptionName); + if(_delegate) + { + _delegate->failed(exceptionName); + } + } + + ObserverPtrType + getDelegate() const + { + return _delegate; + } + + void + setDelegate(ObserverPtrType delegate) + { + _delegate = delegate; + } + + template<typename ObserverImpl, typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType + getObserver(const std::string& mapName, const IceMX::MetricsHelperT<ObserverMetricsType>& helper, + const ObserverPtrType& del) + { + IceInternal::Handle<ObserverImpl> obsv = IceMX::ObserverT<T>::template getObserver<ObserverImpl>(mapName, + helper); + if(obsv) + { + obsv->setDelegate(del); + return obsv; + } + return del; + } + +protected: + + ObserverPtrType _delegate; +}; + +template<typename T> class ObserverFactoryWithDelegateT : public IceMX::ObserverFactoryT<T> +{ +public: + + ObserverFactoryWithDelegateT(const IceInternal::MetricsAdminIPtr& metrics, const std::string& name) : + IceMX::ObserverFactoryT<T>(metrics, name) + { + } + + template<typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType + getObserver(const IceMX::MetricsHelperT<ObserverMetricsType>& helper, const ObserverPtrType& del) + { + IceInternal::Handle<T> obsv = IceMX::ObserverFactoryT<T>::getObserver(helper); + if(obsv) + { + obsv->setDelegate(del); + return obsv; + } + return del; + } + + template<typename ObserverMetricsType, typename ObserverPtrType> ObserverPtrType + getObserver(const IceMX::MetricsHelperT<ObserverMetricsType>& helper, const ObserverPtrType& del, + const ObserverPtrType& old) + { + IceInternal::Handle<T> obsv = IceMX::ObserverFactoryT<T>::getObserver(helper, old); + if(obsv) + { + obsv->setDelegate(del); + return obsv; + } + return del; + } +}; + template<typename Helper> void addEndpointAttributes(typename Helper::Attributes& attrs) { @@ -49,8 +152,8 @@ void addConnectionAttributes(typename Helper::Attributes& attrs) addEndpointAttributes<Helper>(attrs); } -class ConnectionObserverI : public Ice::Instrumentation::ConnectionObserver, - public IceMX::ObserverT<IceMX::ConnectionMetrics> +class ConnectionObserverI : public ObserverWithDelegateT<IceMX::ConnectionMetrics, + Ice::Instrumentation::ConnectionObserver> { public: @@ -58,16 +161,14 @@ public: virtual void receivedBytes(Ice::Int); }; -class ThreadObserverI : public Ice::Instrumentation::ThreadObserver, - public IceMX::ObserverT<IceMX::ThreadMetrics> +class ThreadObserverI : public ObserverWithDelegateT<IceMX::ThreadMetrics, Ice::Instrumentation::ThreadObserver> { public: virtual void stateChanged(Ice::Instrumentation::ThreadState, Ice::Instrumentation::ThreadState); }; -class DispatchObserverI : public Ice::Instrumentation::DispatchObserver, - public IceMX::ObserverT<IceMX::DispatchMetrics> +class DispatchObserverI : public ObserverWithDelegateT<IceMX::DispatchMetrics, Ice::Instrumentation::DispatchObserver> { public: @@ -76,14 +177,15 @@ public: virtual void reply(Ice::Int); }; -class RemoteObserverI : public Ice::Instrumentation::RemoteObserver, - public IceMX::ObserverT<IceMX::RemoteMetrics> +class RemoteObserverI : public ObserverWithDelegateT<IceMX::RemoteMetrics, Ice::Instrumentation::RemoteObserver> { +public: + virtual void reply(Ice::Int); }; -class InvocationObserverI : public Ice::Instrumentation::InvocationObserver, - public IceMX::ObserverT<IceMX::InvocationMetrics> +class InvocationObserverI : public ObserverWithDelegateT<IceMX::InvocationMetrics, + Ice::Instrumentation::InvocationObserver> { public: @@ -95,11 +197,15 @@ public: const Ice::EndpointPtr&, Ice::Int, Ice::Int); }; +typedef ObserverWithDelegateT<IceMX::Metrics, Ice::Instrumentation::Observer> ObserverI; + class ICE_API CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver { public: - CommunicatorObserverI(const IceInternal::MetricsAdminIPtr&); + CommunicatorObserverI(const IceInternal::MetricsAdminIPtr&, + const Ice::Instrumentation::CommunicatorObserverPtr& = + Ice::Instrumentation::CommunicatorObserverPtr()); virtual void setObserverUpdater(const Ice::Instrumentation::ObserverUpdaterPtr&); @@ -126,16 +232,20 @@ public: const IceInternal::MetricsAdminIPtr& getMetricsAdmin() const; + void destroy(); + private: - const IceInternal::MetricsAdminIPtr _metrics; + IceInternal::MetricsAdminIPtr _metrics; + Ice::LoggerPtr _logger; + const Ice::Instrumentation::CommunicatorObserverPtr _delegate; - IceMX::ObserverFactoryT<ConnectionObserverI> _connections; - IceMX::ObserverFactoryT<DispatchObserverI> _dispatch; - IceMX::ObserverFactoryT<InvocationObserverI> _invocations; - IceMX::ObserverFactoryT<ThreadObserverI> _threads; - IceMX::ObserverFactoryT<IceMX::ObserverI> _connects; - IceMX::ObserverFactoryT<IceMX::ObserverI> _endpointLookups; + ObserverFactoryWithDelegateT<ConnectionObserverI> _connections; + ObserverFactoryWithDelegateT<DispatchObserverI> _dispatch; + ObserverFactoryWithDelegateT<InvocationObserverI> _invocations; + ObserverFactoryWithDelegateT<ThreadObserverI> _threads; + ObserverFactoryWithDelegateT<ObserverI> _connects; + ObserverFactoryWithDelegateT<ObserverI> _endpointLookups; }; typedef IceUtil::Handle<CommunicatorObserverI> CommunicatorObserverIPtr; diff --git a/cpp/src/Ice/ObserverHelper.cpp b/cpp/src/Ice/ObserverHelper.cpp index ca148bff6da..4215d77eabd 100644 --- a/cpp/src/Ice/ObserverHelper.cpp +++ b/cpp/src/Ice/ObserverHelper.cpp @@ -25,7 +25,7 @@ Ice::Context emptyCtx; IceInternal::InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx) { - const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer; + const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->getObserver(); if(!obsv) { return; @@ -43,7 +43,7 @@ IceInternal::InvocationObserver::InvocationObserver(IceProxy::Ice::Object* proxy IceInternal::InvocationObserver::InvocationObserver(IceInternal::Instance* instance, const string& op) { - const CommunicatorObserverPtr& obsv = instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = instance->getObserver(); if(!obsv) { return; @@ -55,7 +55,7 @@ IceInternal::InvocationObserver::InvocationObserver(IceInternal::Instance* insta void IceInternal::InvocationObserver::attach(IceProxy::Ice::Object* proxy, const string& op, const Context* ctx) { - const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->initializationData().observer; + const CommunicatorObserverPtr& obsv = proxy->__reference()->getInstance()->getObserver(); if(!obsv) { return; @@ -74,7 +74,7 @@ IceInternal::InvocationObserver::attach(IceProxy::Ice::Object* proxy, const stri void IceInternal::InvocationObserver::attach(IceInternal::Instance* instance, const string& op) { - const CommunicatorObserverPtr& obsv = instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = instance->getObserver(); if(!obsv) { return; diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 62652c786d2..2234e8c7ceb 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -1128,7 +1128,7 @@ void IceInternal::ThreadPool::EventHandlerThread::updateObserver() { // Must be called with the thread pool mutex locked - const CommunicatorObserverPtr& obsv = _pool->_instance->initializationData().observer; + const CommunicatorObserverPtr& obsv = _pool->_instance->getObserver(); if(obsv) { _observer.attach(obsv->getThreadObserver(_pool->_prefix, name(), _state, _observer.get())); diff --git a/cpp/test/Ice/metrics/AllTests.cpp b/cpp/test/Ice/metrics/AllTests.cpp index a181f43fbb1..ea097a349ac 100644 --- a/cpp/test/Ice/metrics/AllTests.cpp +++ b/cpp/test/Ice/metrics/AllTests.cpp @@ -9,6 +9,7 @@ #include <Ice/Ice.h> #include <TestCommon.h> +#include <InstrumentationI.h> #include <Test.h> using namespace std; @@ -368,7 +369,7 @@ toMap(const IceMX::MetricsMap& mmap) } MetricsPrx -allTests(const Ice::CommunicatorPtr& communicator) +allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPtr& obsv) { MetricsPrx metrics = MetricsPrx::checkedCast(communicator->stringToProxy("metrics:default -p 12010")); @@ -1047,5 +1048,39 @@ allTests(const Ice::CommunicatorPtr& communicator) cout << "ok" << endl; + cout << "testing instrumentation observer delegate... " << flush; + + test(obsv->threadObserver->total > 0); + test(obsv->connectionObserver->total > 0); + test(obsv->connectionEstablishmentObserver->total > 0); + test(obsv->endpointLookupObserver->total > 0); + test(obsv->dispatchObserver->total > 0); + test(obsv->invocationObserver->total > 0); + test(obsv->invocationObserver->remoteObserver->total > 0); + + test(obsv->threadObserver->current > 0); + test(obsv->connectionObserver->current > 0); + test(obsv->connectionEstablishmentObserver->current == 0); + test(obsv->endpointLookupObserver->current == 0); + test(obsv->dispatchObserver->current == 0); + test(obsv->invocationObserver->current == 0); + test(obsv->invocationObserver->remoteObserver->current == 0); + + test(obsv->threadObserver->failedCount == 0); + test(obsv->connectionObserver->failedCount > 0); + test(obsv->connectionEstablishmentObserver->failedCount > 0); + test(obsv->endpointLookupObserver->failedCount > 0); + //test(obsv->dispatchObserver->failedCount > 0); + test(obsv->invocationObserver->failedCount > 0); + test(obsv->invocationObserver->remoteObserver->failedCount > 0); + + test(obsv->threadObserver->states > 0); + test(obsv->connectionObserver->received > 0 && obsv->connectionObserver->sent > 0); + //test(obsv->dispatchObserver->userExceptionCount > 0); + test(obsv->invocationObserver->userExceptionCount > 0 && obsv->invocationObserver->retriedCount > 0); + test(obsv->invocationObserver->remoteObserver->replySize > 0); + + cout << "ok" << endl; + return metrics; } diff --git a/cpp/test/Ice/metrics/Client.cpp b/cpp/test/Ice/metrics/Client.cpp index f30ad6032ad..4f541c2f7cc 100644 --- a/cpp/test/Ice/metrics/Client.cpp +++ b/cpp/test/Ice/metrics/Client.cpp @@ -10,6 +10,7 @@ #include <Ice/Ice.h> #include <TestCommon.h> #include <Test.h> +#include <InstrumentationI.h> DEFINE_TEST("client") @@ -17,10 +18,10 @@ using namespace std; using namespace Test; int -run(int, char**, const Ice::CommunicatorPtr& communicator) +run(int, char**, const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPtr& observer) { - MetricsPrx allTests(const Ice::CommunicatorPtr&); - MetricsPrx metrics = allTests(communicator); + MetricsPrx allTests(const Ice::CommunicatorPtr&, const CommunicatorObserverIPtr&); + MetricsPrx metrics = allTests(communicator, observer); metrics->shutdown(); return EXIT_SUCCESS; } @@ -40,8 +41,10 @@ main(int argc, char* argv[]) initData.properties->setProperty("Ice.Admin.DelayCreation", "1"); initData.properties->setProperty("Ice.Warn.Connections", "0"); initData.properties->setProperty("Ice.MessageSizeMax", "50000"); + CommunicatorObserverIPtr observer = new CommunicatorObserverI(); + initData.observer = observer; communicator = Ice::initialize(argc, argv, initData); - status = run(argc, argv, communicator); + status = run(argc, argv, communicator, observer); } catch(const Ice::Exception& ex) { diff --git a/cpp/test/Ice/metrics/InstrumentationI.h b/cpp/test/Ice/metrics/InstrumentationI.h new file mode 100644 index 00000000000..92ff7e1d11d --- /dev/null +++ b/cpp/test/Ice/metrics/InstrumentationI.h @@ -0,0 +1,334 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef INSTRUMENTATION_I_H +#define INSTRUMENTATION_I_H + +#include <Ice/Instrumentation.h> + +class ObserverI : virtual public Ice::Instrumentation::Observer, public IceUtil::Mutex +{ +public: + + virtual void + reset() + { + total = 0; + current = 0; + failedCount = 0; + } + + virtual void + attach() + { + IceUtil::Mutex::Lock sync(*this); + ++total; + ++current; + } + virtual void + detach() + { + IceUtil::Mutex::Lock sync(*this); + --current; + } + virtual void + failed(const std::string&) + { + IceUtil::Mutex::Lock sync(*this); + ++failedCount; + } + + Ice::Int total; + Ice::Int current; + Ice::Int failedCount; +}; + +class ConnectionObserverI : public Ice::Instrumentation::ConnectionObserver, public ObserverI +{ +public: + + virtual void + reset() + { + IceUtil::Mutex::Lock sync(*this); + ObserverI::reset(); + received = 0; + sent = 0; + } + + virtual void + sentBytes(Ice::Int s) + { + IceUtil::Mutex::Lock sync(*this); + sent += s; + } + + virtual void + receivedBytes(Ice::Int s) + { + IceUtil::Mutex::Lock sync(*this); + received += s; + } + + Ice::Int sent; + Ice::Int received; +}; + +class ThreadObserverI : public Ice::Instrumentation::ThreadObserver, public ObserverI +{ +public: + + virtual void + reset() + { + IceUtil::Mutex::Lock sync(*this); + ObserverI::reset(); + states = 0; + } + + virtual void + stateChanged(Ice::Instrumentation::ThreadState, Ice::Instrumentation::ThreadState) + { + IceUtil::Mutex::Lock sync(*this); + ++states; + } + + Ice::Int states; +}; + +class DispatchObserverI : public Ice::Instrumentation::DispatchObserver, public ObserverI +{ +public: + + virtual void reset() + { + IceUtil::Mutex::Lock sync(*this); + ObserverI::reset(); + userExceptionCount = 0; + replySize = 0; + } + + virtual void + userException() + { + IceUtil::Mutex::Lock sync(*this); + ++userExceptionCount; + } + + virtual void + reply(Ice::Int s) + { + IceUtil::Mutex::Lock sync(*this); + replySize += s; + } + + Ice::Int userExceptionCount; + Ice::Int replySize; +}; + +class RemoteObserverI : public Ice::Instrumentation::RemoteObserver, public ObserverI +{ +public: + + virtual void + reset() + { + IceUtil::Mutex::Lock sync(*this); + ObserverI::reset(); + replySize = 0; + } + + virtual void + reply(Ice::Int s) + { + IceUtil::Mutex::Lock sync(*this); + replySize += s; + } + + Ice::Int replySize; +}; + +class InvocationObserverI : public Ice::Instrumentation::InvocationObserver, public ObserverI +{ +public: + + virtual void reset() + { + IceUtil::Mutex::Lock sync(*this); + ObserverI::reset(); + retriedCount = 0; + userExceptionCount = 0; + if(remoteObserver) + { + remoteObserver->reset(); + } + } + + virtual void + retried() + { + IceUtil::Mutex::Lock sync(*this); + ++retriedCount; + } + + virtual void + userException() + { + IceUtil::Mutex::Lock sync(*this); + ++userExceptionCount; + } + + virtual Ice::Instrumentation::RemoteObserverPtr + getRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& e, Ice::Int, Ice::Int) + { + IceUtil::Mutex::Lock sync(*this); + if(!remoteObserver) + { + remoteObserver = new RemoteObserverI(); + remoteObserver->reset(); + } + return remoteObserver; + } + + Ice::Int userExceptionCount; + Ice::Int retriedCount; + + IceUtil::Handle<RemoteObserverI> remoteObserver; +}; + +class CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver, public IceUtil::Mutex +{ +public: + + virtual void + setObserverUpdater(const Ice::Instrumentation::ObserverUpdaterPtr& u) + { + updater = u; + } + + virtual Ice::Instrumentation::ObserverPtr + getConnectionEstablishmentObserver(const Ice::EndpointPtr&, const std::string&) + { + IceUtil::Mutex::Lock sync(*this); + if(!connectionEstablishmentObserver) + { + connectionEstablishmentObserver = new ObserverI(); + connectionEstablishmentObserver->reset(); + } + return connectionEstablishmentObserver; + } + + + virtual Ice::Instrumentation::ObserverPtr + getEndpointLookupObserver(const Ice::EndpointPtr&) + { + IceUtil::Mutex::Lock sync(*this); + if(!endpointLookupObserver) + { + endpointLookupObserver = new ObserverI(); + endpointLookupObserver->reset(); + } + return endpointLookupObserver; + } + + virtual Ice::Instrumentation::ConnectionObserverPtr + getConnectionObserver(const Ice::ConnectionInfoPtr&, + const Ice::EndpointPtr&, + Ice::Instrumentation::ConnectionState, + const Ice::Instrumentation::ConnectionObserverPtr& old) + { + IceUtil::Mutex::Lock sync(*this); + test(!old || dynamic_cast<ConnectionObserverI*>(old.get())); + if(!connectionObserver) + { + connectionObserver = new ConnectionObserverI(); + connectionObserver->reset(); + } + return connectionObserver; + } + + virtual Ice::Instrumentation::ThreadObserverPtr + getThreadObserver(const std::string&, const std::string&, Ice::Instrumentation::ThreadState, + const Ice::Instrumentation::ThreadObserverPtr& old) + { + IceUtil::Mutex::Lock sync(*this); + test(!old || dynamic_cast<ThreadObserverI*>(old.get())); + if(!threadObserver) + { + threadObserver = new ThreadObserverI(); + threadObserver->reset(); + } + return threadObserver; + } + + virtual Ice::Instrumentation::InvocationObserverPtr + getInvocationObserver(const Ice::ObjectPrx&, const std::string&, const Ice::Context&) + { + IceUtil::Mutex::Lock sync(*this); + if(!invocationObserver) + { + invocationObserver = new InvocationObserverI(); + invocationObserver->reset(); + } + return invocationObserver; + } + + virtual Ice::Instrumentation::DispatchObserverPtr + getDispatchObserver(const Ice::Current&, Ice::Int) + { + IceUtil::Mutex::Lock sync(*this); + if(!dispatchObserver) + { + dispatchObserver = new DispatchObserverI(); + dispatchObserver->reset(); + } + return dispatchObserver; + } + + void reset() + { + if(connectionEstablishmentObserver) + { + connectionEstablishmentObserver->reset(); + } + if(endpointLookupObserver) + { + endpointLookupObserver->reset(); + } + if(connectionObserver) + { + connectionObserver->reset(); + } + if(threadObserver) + { + threadObserver->reset(); + } + if(invocationObserver) + { + invocationObserver->reset(); + } + if(dispatchObserver) + { + dispatchObserver->reset(); + } + } + + Ice::Instrumentation::ObserverUpdaterPtr updater; + + IceUtil::Handle<ObserverI> connectionEstablishmentObserver; + IceUtil::Handle<ObserverI> endpointLookupObserver; + IceUtil::Handle<ConnectionObserverI> connectionObserver; + IceUtil::Handle<ThreadObserverI> threadObserver; + IceUtil::Handle<InvocationObserverI> invocationObserver; + IceUtil::Handle<DispatchObserverI> dispatchObserver; +}; + +typedef IceUtil::Handle<CommunicatorObserverI> CommunicatorObserverIPtr; + +#endif diff --git a/cs/src/Ice/AsyncIOThread.cs b/cs/src/Ice/AsyncIOThread.cs index ebe6ea43562..cacff0a48f3 100644 --- a/cs/src/Ice/AsyncIOThread.cs +++ b/cs/src/Ice/AsyncIOThread.cs @@ -45,7 +45,7 @@ namespace IceInternal _m.Lock(); try { - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { _observer = obsv.getThreadObserver("Communicator", diff --git a/cs/src/Ice/CommunicatorI.cs b/cs/src/Ice/CommunicatorI.cs index df3ff1e1653..a910a012424 100644 --- a/cs/src/Ice/CommunicatorI.cs +++ b/cs/src/Ice/CommunicatorI.cs @@ -126,7 +126,7 @@ namespace Ice public Ice.Instrumentation.CommunicatorObserver getObserver() { - return instance_.initializationData().observer; + return instance_.getObserver(); } public RouterPrx getDefaultRouter() diff --git a/cs/src/Ice/ConnectionFactory.cs b/cs/src/Ice/ConnectionFactory.cs index 886dd4dd2de..848d59d323c 100644 --- a/cs/src/Ice/ConnectionFactory.cs +++ b/cs/src/Ice/ConnectionFactory.cs @@ -238,7 +238,7 @@ namespace IceInternal // Try to establish the connection to the connectors. // DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); ConnectorInfo ci = null; for(int i = 0; i < connectors.Count; ++i) { @@ -1251,7 +1251,7 @@ namespace IceInternal Debug.Assert(_iter < _connectors.Count); _current = _connectors[_iter++]; - Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.getObserver(); if(obsv != null) { _observer = obsv.getConnectionEstablishmentObserver(_current.endpoint, diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs index f03ce804672..8b1facc56e0 100644 --- a/cs/src/Ice/ConnectionI.cs +++ b/cs/src/Ice/ConnectionI.cs @@ -325,11 +325,11 @@ namespace Ice return; } - Debug.Assert(_instance.initializationData().observer != null); - _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), - _endpoint, - toConnectionState(_state), - _observer); + Debug.Assert(_instance.getObserver() != null); + _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), + _endpoint, + toConnectionState(_state), + _observer); if(_observer != null) { _observer.attach(); @@ -2062,16 +2062,16 @@ namespace Ice } } - if(_instance.initializationData().observer != null) + if(_instance.getObserver() != null) { ConnectionState oldState = toConnectionState(_state); ConnectionState newState = toConnectionState(state); if(oldState != newState) { - _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), - _endpoint, - newState, - _observer); + _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), + _endpoint, + newState, + _observer); if(_observer != null) { _observer.attach(); diff --git a/cs/src/Ice/EndpointHostResolver.cs b/cs/src/Ice/EndpointHostResolver.cs index e65fc6fbf0c..7ef8a44b2bf 100644 --- a/cs/src/Ice/EndpointHostResolver.cs +++ b/cs/src/Ice/EndpointHostResolver.cs @@ -54,7 +54,7 @@ namespace IceInternal } } - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); Ice.Instrumentation.Observer observer = null; if(obsv != null) { @@ -133,7 +133,7 @@ namespace IceInternal entry.endpoint = endpoint; entry.callback = callback; - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { entry.observer = obsv.getEndpointLookupObserver(endpoint); @@ -273,7 +273,7 @@ namespace IceInternal _m.Lock(); try { - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { _observer = obsv.getThreadObserver("Communicator", diff --git a/cs/src/Ice/Incoming.cs b/cs/src/Ice/Incoming.cs index c185772dbdc..c4b723f40cc 100644 --- a/cs/src/Ice/Incoming.cs +++ b/cs/src/Ice/Incoming.cs @@ -689,7 +689,7 @@ namespace IceInternal current_.ctx[first] = second; } - Ice.Instrumentation.CommunicatorObserver obsv = instance_.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = instance_.getObserver(); if(obsv != null) { // Read the encapsulation size. diff --git a/cs/src/Ice/Instance.cs b/cs/src/Ice/Instance.cs index 06ff92c55ff..7fe0cb447a3 100644 --- a/cs/src/Ice/Instance.cs +++ b/cs/src/Ice/Instance.cs @@ -592,6 +592,12 @@ namespace IceInternal } } + public Ice.Instrumentation.CommunicatorObserver + getObserver() + { + return _observer; + } + public void setDefaultLocator(Ice.LocatorPrx locator) { @@ -870,18 +876,20 @@ namespace IceInternal // Setup the communicator observer only if the user didn't already set an // Ice observer resolver and if the admininistrative endpoints are set. // - if(_initData.observer == null && - (_adminFacetFilter.Count == 0 || _adminFacetFilter.Contains("Metrics")) && + if((_adminFacetFilter.Count == 0 || _adminFacetFilter.Contains("Metrics")) && _initData.properties.getProperty("Ice.Admin.Endpoints").Length > 0) { - CommunicatorObserverI observer = new CommunicatorObserverI(admin); - _initData.observer = observer; + _observer = new CommunicatorObserverI(admin, _initData.observer); // // Make sure the admin plugin receives property updates. // props.addUpdateCallback(admin); } + else + { + _observer = initData.observer; + } } catch(Ice.LocalException) { @@ -904,9 +912,9 @@ namespace IceInternal // // Set observer updater // - if(_initData.observer != null) + if(_observer != null) { - _initData.observer.setObserverUpdater(new ObserverUpdaterI(this)); + _observer.setObserverUpdater(new ObserverUpdaterI(this)); } // @@ -1265,6 +1273,7 @@ namespace IceInternal private int _clientACM; // Immutable, not reset by destroy(). private int _serverACM; // Immutable, not reset by destroy(). private Ice.ImplicitContextI _implicitContext; // Immutable + private Ice.Instrumentation.CommunicatorObserver _observer; // Immutable private RouterManager _routerManager; private LocatorManager _locatorManager; private ReferenceFactory _referenceFactory; diff --git a/cs/src/Ice/InstrumentationI.cs b/cs/src/Ice/InstrumentationI.cs index f04c5370861..fa1595115e0 100644 --- a/cs/src/Ice/InstrumentationI.cs +++ b/cs/src/Ice/InstrumentationI.cs @@ -16,6 +16,101 @@ namespace IceInternal using IceMX; + public class ObserverWithDelegate<T, O> : Observer<T> + where T : Metrics, new() + where O : Ice.Instrumentation.Observer + { + override public void + attach() + { + base.attach(); + if(delegate_ != null) + { + delegate_.attach(); + } + } + + override public void + detach() + { + base.detach(); + if(delegate_ != null) + { + delegate_.detach(); + } + } + + override public void + failed(string exceptionName) + { + base.failed(exceptionName); + if(delegate_ != null) + { + delegate_.failed(exceptionName); + } + } + + public O + getDelegate() + { + return delegate_; + } + + public void + setDelegate(O del) + { + delegate_ = del; + } + + public Observer getObserver<S, ObserverImpl, Observer>(string mapName, MetricsHelper<S> helper, Observer del) + where S : Metrics, new() + where ObserverImpl : ObserverWithDelegate<S, Observer>, Observer, new() + where Observer : Ice.Instrumentation.Observer + { + ObserverImpl obsv = base.getObserver<S, ObserverImpl>(mapName, helper); + if(obsv != null) + { + obsv.setDelegate(del); + return (Observer)obsv; + } + return del; + } + + protected O delegate_; + }; + + public class ObserverFactoryWithDelegate<T, OImpl, O> : ObserverFactory<T, OImpl> + where T : Metrics, new() + where OImpl : ObserverWithDelegate<T, O>, O, new() + where O : Ice.Instrumentation.Observer + { + public ObserverFactoryWithDelegate(IceInternal.MetricsAdminI metrics, string name) : base(metrics, name) + { + } + + public O getObserver(MetricsHelper<T> helper, O del) + { + OImpl o = base.getObserver(helper); + if(o != null) + { + o.setDelegate(del); + return o; + } + return del; + } + + public O getObserver(MetricsHelper<T> helper, object observer, O del) + { + OImpl o = base.getObserver(helper, observer); + if(o != null) + { + o.setDelegate(del); + return o; + } + return del; + } + } + static class AttrsUtil { public static void @@ -645,22 +740,31 @@ namespace IceInternal private Ice.EndpointInfo _endpointInfo; }; - public class ObserverI :Observer<Metrics> + public class ObserverWithDelegateI : ObserverWithDelegate<Metrics, Ice.Instrumentation.Observer> { }; - public class ConnectionObserverI : Observer<ConnectionMetrics>, Ice.Instrumentation.ConnectionObserver + public class ConnectionObserverI : ObserverWithDelegate<ConnectionMetrics, Ice.Instrumentation.ConnectionObserver>, + Ice.Instrumentation.ConnectionObserver { public void sentBytes(int num) { _sentBytes = num; forEach(sentBytesUpdate); + if(delegate_ != null) + { + delegate_.sentBytes(num); + } } public void receivedBytes(int num) { _receivedBytes = num; forEach(receivedBytesUpdate); + if(delegate_ != null) + { + delegate_.receivedBytes(num); + } } private void sentBytesUpdate(ConnectionMetrics v) @@ -677,12 +781,17 @@ namespace IceInternal private int _receivedBytes; }; - public class DispatchObserverI : Observer<DispatchMetrics>, Ice.Instrumentation.DispatchObserver + public class DispatchObserverI : ObserverWithDelegate<DispatchMetrics, Ice.Instrumentation.DispatchObserver>, + Ice.Instrumentation.DispatchObserver { public void userException() { forEach(userException); + if(delegate_ != null) + { + delegate_.userException(); + } } public void reply(int size) @@ -690,6 +799,10 @@ namespace IceInternal forEach((DispatchMetrics v) => { v.replySize += size; }); + if(delegate_ != null) + { + delegate_.reply(size); + } } private void userException(DispatchMetrics v) @@ -698,35 +811,56 @@ namespace IceInternal } } - public class RemoteObserverI : Observer<RemoteMetrics>, Ice.Instrumentation.RemoteObserver + public class RemoteObserverI : ObserverWithDelegate<RemoteMetrics, Ice.Instrumentation.RemoteObserver>, + Ice.Instrumentation.RemoteObserver { public void reply(int size) { forEach((RemoteMetrics v) => { v.replySize += size; }); + if(delegate_ != null) + { + delegate_.reply(size); + } } } - public class InvocationObserverI : Observer<InvocationMetrics>, Ice.Instrumentation.InvocationObserver + public class InvocationObserverI : ObserverWithDelegate<InvocationMetrics, Ice.Instrumentation.InvocationObserver>, + Ice.Instrumentation.InvocationObserver { public void userException() { forEach(userException); + if(delegate_ != null) + { + delegate_.userException(); + } } public void retried() { forEach(incrementRetry); + if(delegate_ != null) + { + delegate_.retried(); + } } public Ice.Instrumentation.RemoteObserver getRemoteObserver(Ice.ConnectionInfo con, Ice.Endpoint endpt, int requestId, int size) { - return getObserver<RemoteMetrics, RemoteObserverI>("Remote", - new RemoteInvocationHelper(con, endpt, requestId, size)); + Ice.Instrumentation.RemoteObserver del = null; + if(delegate_ != null) + { + del = delegate_.getRemoteObserver(con, endpt, requestId, size); + } + return getObserver<RemoteMetrics, RemoteObserverI, + Ice.Instrumentation.RemoteObserver>("Remote", + new RemoteInvocationHelper(con, endpt, requestId, size), + del); } private void incrementRetry(InvocationMetrics v) @@ -740,13 +874,18 @@ namespace IceInternal } } - public class ThreadObserverI : Observer<ThreadMetrics>, Ice.Instrumentation.ThreadObserver + public class ThreadObserverI : ObserverWithDelegate<ThreadMetrics, Ice.Instrumentation.ThreadObserver>, + Ice.Instrumentation.ThreadObserver { public void stateChanged(Ice.Instrumentation.ThreadState oldState, Ice.Instrumentation.ThreadState newState) { _oldState = oldState; _newState = newState; forEach(threadStateUpdate); + if(delegate_ != null) + { + delegate_.stateChanged(oldState, newState); + } } private void threadStateUpdate(ThreadMetrics v) @@ -787,15 +926,27 @@ namespace IceInternal public class CommunicatorObserverI : Ice.Instrumentation.CommunicatorObserver { - public CommunicatorObserverI(IceInternal.MetricsAdminI metrics) + public CommunicatorObserverI(IceInternal.MetricsAdminI metrics) : this(metrics, null) + { + } + + public CommunicatorObserverI(IceInternal.MetricsAdminI metrics, + Ice.Instrumentation.CommunicatorObserver del) { _metrics = metrics; - _connections = new ObserverFactory<ConnectionMetrics, ConnectionObserverI>(metrics, "Connection"); - _dispatch = new ObserverFactory<DispatchMetrics, DispatchObserverI>(metrics, "Dispatch"); - _invocations = new ObserverFactory<InvocationMetrics, InvocationObserverI>(metrics, "Invocation"); - _threads = new ObserverFactory<ThreadMetrics, ThreadObserverI>(metrics, "Thread"); - _connects = new ObserverFactory<Metrics, ObserverI>(metrics, "ConnectionEstablishment"); - _endpointLookups = new ObserverFactory<Metrics, ObserverI>(metrics, "EndpointLookup"); + _delegate = del; + _connections = new ObserverFactoryWithDelegate<ConnectionMetrics, ConnectionObserverI, + Ice.Instrumentation.ConnectionObserver>(metrics, "Connection"); + _dispatch = new ObserverFactoryWithDelegate<DispatchMetrics, DispatchObserverI, + Ice.Instrumentation.DispatchObserver>(metrics, "Dispatch"); + _invocations = new ObserverFactoryWithDelegate<InvocationMetrics, InvocationObserverI, + Ice.Instrumentation.InvocationObserver>(metrics, "Invocation"); + _threads = new ObserverFactoryWithDelegate<ThreadMetrics, ThreadObserverI, + Ice.Instrumentation.ThreadObserver>(metrics, "Thread"); + _connects = new ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer>(metrics, "ConnectionEstablishment"); + _endpointLookups = new ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer>(metrics, "EndpointLookup"); try { @@ -811,7 +962,19 @@ namespace IceInternal { if(_connects.isEnabled()) { - return _connects.getObserver(new EndpointHelper(endpt, connector)); + try + { + Ice.Instrumentation.Observer del = null; + if(_delegate != null) + { + del = _delegate.getConnectionEstablishmentObserver(endpt, connector); + } + return _connects.getObserver(new EndpointHelper(endpt, connector), del); + } + catch(Exception ex) + { + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + } } return null; } @@ -820,29 +983,68 @@ namespace IceInternal { if(_endpointLookups.isEnabled()) { - return _endpointLookups.getObserver(new EndpointHelper(endpt)); + try + { + Ice.Instrumentation.Observer del = null; + if(_delegate != null) + { + del = _delegate.getEndpointLookupObserver(endpt); + } + return _endpointLookups.getObserver(new EndpointHelper(endpt), del); + } + catch(Exception ex) + { + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + } } return null; } - public Ice.Instrumentation.ConnectionObserver getConnectionObserver(Ice.ConnectionInfo c, Ice.Endpoint e, + public Ice.Instrumentation.ConnectionObserver getConnectionObserver(Ice.ConnectionInfo c, + Ice.Endpoint e, Ice.Instrumentation.ConnectionState s, - Ice.Instrumentation.ConnectionObserver o) + Ice.Instrumentation.ConnectionObserver obsv) { if(_connections.isEnabled()) { - return _connections.getObserver(new ConnectionHelper(c, e, s), o); + try + { + Ice.Instrumentation.ConnectionObserver del = null; + ConnectionObserverI o = obsv is ConnectionObserverI ? (ConnectionObserverI)obsv : null; + if(_delegate != null) + { + del = _delegate.getConnectionObserver(c, e, s, o != null ? o.getDelegate() : obsv); + } + return _connections.getObserver(new ConnectionHelper(c, e, s), obsv, del); + } + catch(Exception ex) + { + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + } } return null; } public Ice.Instrumentation.ThreadObserver getThreadObserver(string parent, string id, Ice.Instrumentation.ThreadState s, - Ice.Instrumentation.ThreadObserver o) + Ice.Instrumentation.ThreadObserver obsv) { if(_threads.isEnabled()) { - return _threads.getObserver(new ThreadHelper(parent, id, s), o); + try + { + Ice.Instrumentation.ThreadObserver del = null; + ThreadObserverI o = obsv is ThreadObserverI ? (ThreadObserverI)obsv : null; + if(_delegate != null) + { + del = _delegate.getThreadObserver(parent, id, s, o != null ? o.getDelegate() : obsv); + } + return _threads.getObserver(new ThreadHelper(parent, id, s), obsv, del); + } + catch(Exception ex) + { + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + } } return null; } @@ -852,7 +1054,19 @@ namespace IceInternal { if(_invocations.isEnabled()) { - return _invocations.getObserver(new InvocationHelper(prx, operation, ctx)); + try + { + Ice.Instrumentation.InvocationObserver del = null; + if(_delegate != null) + { + del = _delegate.getInvocationObserver(prx, operation, ctx); + } + return _invocations.getObserver(new InvocationHelper(prx, operation, ctx), del); + } + catch(Exception ex) + { + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + } } return null; } @@ -861,7 +1075,19 @@ namespace IceInternal { if(_dispatch.isEnabled()) { - return _dispatch.getObserver(new DispatchHelper(c, size)); + try + { + Ice.Instrumentation.DispatchObserver del = null; + if(_delegate != null) + { + del = _delegate.getDispatchObserver(c, size); + } + return _dispatch.getObserver(new DispatchHelper(c, size), del); + } + catch(Exception ex) + { + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + } } return null; } @@ -870,6 +1096,10 @@ namespace IceInternal { _connections.setUpdater(updater.updateConnectionObservers); _threads.setUpdater(updater.updateThreadObservers); + if(_delegate != null) + { + _delegate.setObserverUpdater(updater); + } } public IceInternal.MetricsAdminI getMetricsAdmin() @@ -878,11 +1108,18 @@ namespace IceInternal } readonly private IceInternal.MetricsAdminI _metrics; - readonly private ObserverFactory<ConnectionMetrics, ConnectionObserverI> _connections; - readonly private ObserverFactory<DispatchMetrics, DispatchObserverI> _dispatch; - readonly private ObserverFactory<InvocationMetrics, InvocationObserverI> _invocations; - readonly private ObserverFactory<ThreadMetrics, ThreadObserverI> _threads; - readonly private ObserverFactory<Metrics, ObserverI> _connects; - readonly private ObserverFactory<Metrics, ObserverI> _endpointLookups; + readonly private Ice.Instrumentation.CommunicatorObserver _delegate; + readonly private ObserverFactoryWithDelegate<ConnectionMetrics, ConnectionObserverI, + Ice.Instrumentation.ConnectionObserver> _connections; + readonly private ObserverFactoryWithDelegate<DispatchMetrics, DispatchObserverI, + Ice.Instrumentation.DispatchObserver> _dispatch; + readonly private ObserverFactoryWithDelegate<InvocationMetrics, InvocationObserverI, + Ice.Instrumentation.InvocationObserver> _invocations; + readonly private ObserverFactoryWithDelegate<ThreadMetrics, ThreadObserverI, + Ice.Instrumentation.ThreadObserver> _threads; + readonly private ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer> _connects; + readonly private ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer> _endpointLookups; } }
\ No newline at end of file diff --git a/cs/src/Ice/MetricsObserverI.cs b/cs/src/Ice/MetricsObserverI.cs index ee4d70a81c0..d92ff87ccd5 100644 --- a/cs/src/Ice/MetricsObserverI.cs +++ b/cs/src/Ice/MetricsObserverI.cs @@ -214,12 +214,12 @@ namespace IceMX { public delegate void MetricsUpdate(T m); - public void attach() + virtual public void attach() { Start(); } - public void detach() + virtual public void detach() { Stop(); long lifetime = _previousDelay + (long)(ElapsedTicks / (Frequency / 1000000.0)); @@ -229,7 +229,7 @@ namespace IceMX } } - public void failed(string exceptionName) + virtual public void failed(string exceptionName) { foreach(MetricsMap<T>.Entry e in _objects) { @@ -349,7 +349,14 @@ namespace IceMX lock(this) { List<MetricsMap<T>.Entry> metricsObjects = null; - O old = (O)observer; + O old = null; + try + { + old = (O)observer; + } + catch(InvalidCastException) + { + } foreach(MetricsMap<T> m in _maps) { MetricsMap<T>.Entry e = m.getMatching(helper, old != null ? old.getEntry(m) : null); diff --git a/cs/src/Ice/ObserverHelper.cs b/cs/src/Ice/ObserverHelper.cs index 677350499af..dafa0188c60 100644 --- a/cs/src/Ice/ObserverHelper.cs +++ b/cs/src/Ice/ObserverHelper.cs @@ -16,7 +16,7 @@ namespace IceInternal { static public InvocationObserver get(Instance instance, string op) { - CommunicatorObserver obsv = instance.initializationData().observer; + CommunicatorObserver obsv = instance.getObserver(); if(obsv != null) { InvocationObserver observer = obsv.getInvocationObserver(null, op, _emptyContext); @@ -36,8 +36,7 @@ namespace IceInternal static public InvocationObserver get(Ice.ObjectPrx proxy, string op, Dictionary<string, string> context) { - CommunicatorObserver obsv = - ((Ice.ObjectPrxHelperBase)proxy).reference__().getInstance().initializationData().observer; + CommunicatorObserver obsv = ((Ice.ObjectPrxHelperBase)proxy).reference__().getInstance().getObserver(); if(obsv != null) { InvocationObserver observer; diff --git a/cs/src/Ice/ThreadPool.cs b/cs/src/Ice/ThreadPool.cs index 582e5d1ed17..4a00255cdfd 100644 --- a/cs/src/Ice/ThreadPool.cs +++ b/cs/src/Ice/ThreadPool.cs @@ -761,7 +761,7 @@ namespace IceInternal public void updateObserver() { // Must be called with the thread pool mutex locked - Ice.Instrumentation.CommunicatorObserver obsv = _threadPool._instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _threadPool._instance.getObserver(); if(obsv != null) { _observer = obsv.getThreadObserver(_threadPool._prefix, _name, _state, _observer); diff --git a/cs/test/Ice/metrics/AllTests.cs b/cs/test/Ice/metrics/AllTests.cs index e091fca7f35..99c8d9353bd 100644 --- a/cs/test/Ice/metrics/AllTests.cs +++ b/cs/test/Ice/metrics/AllTests.cs @@ -377,9 +377,9 @@ public class AllTests : TestCommon.TestApp } #if SILVERLIGHT - override public void run(Ice.Communicator communicator) + override public void run(Ice.Communicator communicator, CommunicatorObserverI obsv) #else - public static MetricsPrx allTests(Ice.Communicator communicator) + public static MetricsPrx allTests(Ice.Communicator communicator, CommunicatorObserverI obsv) #endif { MetricsPrx metrics = MetricsPrxHelper.checkedCast(communicator.stringToProxy("metrics:default -p 12010")); @@ -1057,6 +1057,41 @@ public class AllTests : TestCommon.TestApp WriteLine("ok"); + Write("testing instrumentation observer delegate... "); + Flush(); + + test(obsv.threadObserver.total > 0); + test(obsv.connectionObserver.total > 0); + test(obsv.connectionEstablishmentObserver.total > 0); + test(obsv.endpointLookupObserver.total > 0); + test(obsv.dispatchObserver.total > 0); + test(obsv.invocationObserver.total > 0); + test(obsv.invocationObserver.remoteObserver.total > 0); + + test(obsv.threadObserver.current > 0); + test(obsv.connectionObserver.current > 0); + test(obsv.connectionEstablishmentObserver.current == 0); + test(obsv.endpointLookupObserver.current == 0); + test(obsv.dispatchObserver.current == 0); + test(obsv.invocationObserver.current == 0); + test(obsv.invocationObserver.remoteObserver.current == 0); + + test(obsv.threadObserver.failedCount == 0); + test(obsv.connectionObserver.failedCount > 0); + test(obsv.connectionEstablishmentObserver.failedCount > 0); + test(obsv.endpointLookupObserver.failedCount > 0); + //test(obsv.dispatchObserver.failedCount > 0); + test(obsv.invocationObserver.failedCount > 0); + test(obsv.invocationObserver.remoteObserver.failedCount > 0); + + test(obsv.threadObserver.states > 0); + test(obsv.connectionObserver.received > 0 && obsv.connectionObserver.sent > 0); + //test(obsv.dispatchObserver.userExceptionCount > 0); + test(obsv.invocationObserver.userExceptionCount > 0 && obsv.invocationObserver.retriedCount > 0); + test(obsv.invocationObserver.remoteObserver.replySize > 0); + + WriteLine("ok"); + #if SILVERLIGHT metrics.shutdown(); #else diff --git a/cs/test/Ice/metrics/Client.cs b/cs/test/Ice/metrics/Client.cs index 566db6a836d..657b21adc94 100644 --- a/cs/test/Ice/metrics/Client.cs +++ b/cs/test/Ice/metrics/Client.cs @@ -21,7 +21,7 @@ public class Client { private static int run(string[] args, Ice.Communicator communicator) { - Test.MetricsPrx metrics = AllTests.allTests(communicator); + Test.MetricsPrx metrics = AllTests.allTests(communicator, _observer); metrics.shutdown(); return 0; } @@ -52,6 +52,7 @@ public class Client // initData.properties.setProperty("Ice.FactoryAssemblies", "client"); #endif + initData.observer = _observer; communicator = Ice.Util.initialize(ref args, initData); status = run(args, communicator); } @@ -76,4 +77,6 @@ public class Client return status; } + + static CommunicatorObserverI _observer = new CommunicatorObserverI(); } diff --git a/cs/test/Ice/metrics/InstrumentationI.cs b/cs/test/Ice/metrics/InstrumentationI.cs new file mode 100644 index 00000000000..f6811e37fc7 --- /dev/null +++ b/cs/test/Ice/metrics/InstrumentationI.cs @@ -0,0 +1,376 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +using System; +using System.Collections.Generic; +using System.Diagnostics; + +using Test; + +public class ObserverI : Ice.Instrumentation.Observer +{ + virtual public void + reset() + { + lock(this) + { + total = 0; + current = 0; + failedCount = 0; + } + } + + public void + attach() + { + lock(this) + { + ++total; + ++current; + } + } + public void + detach() + { + lock(this) + { + --current; + } + } + public void + failed(String s) + { + lock(this) + { + ++failedCount; + } + } + + public int total; + public int current; + public int failedCount; +}; + +public class RemoteObserverI : ObserverI, Ice.Instrumentation.RemoteObserver +{ + override public void + reset() + { + lock(this) + { + base.reset(); + replySize = 0; + } + } + + public void + reply(int s) + { + lock(this) + { + replySize += s; + } + } + + public int replySize; +}; + +public class InvocationObserverI : ObserverI , Ice.Instrumentation.InvocationObserver +{ + override public void + reset() + { + lock(this) + { + base.reset(); + retriedCount = 0; + userExceptionCount = 0; + if(remoteObserver != null) + { + remoteObserver.reset(); + } + } + } + + public void + retried() + { + lock(this) + { + ++retriedCount; + } + } + + public void + userException() + { + lock(this) + { + ++userExceptionCount; + } + } + + public Ice.Instrumentation.RemoteObserver + getRemoteObserver(Ice.ConnectionInfo c, Ice.Endpoint e, int a, int b) + { + lock(this) + { + if(remoteObserver == null) + { + remoteObserver = new RemoteObserverI(); + remoteObserver.reset(); + } + return remoteObserver; + } + } + + public int userExceptionCount; + public int retriedCount; + + public RemoteObserverI remoteObserver = null; +}; + +public class DispatchObserverI : ObserverI , Ice.Instrumentation.DispatchObserver +{ + override public void + reset() + { + lock(this) + { + base.reset(); + userExceptionCount = 0; + replySize = 0; + } + } + + public void + userException() + { + lock(this) + { + ++userExceptionCount; + } + } + + public void + reply(int s) + { + lock(this) + { + replySize += s; + } + } + + public int userExceptionCount; + public int replySize; +}; + +public class ConnectionObserverI : ObserverI , Ice.Instrumentation.ConnectionObserver +{ + override public void + reset() + { + lock(this) + { + base.reset(); + received = 0; + sent = 0; + } + } + + public void + sentBytes(int s) + { + lock(this) + { + sent += s; + } + } + + public void + receivedBytes(int s) + { + lock(this) + { + received += s; + } + } + + public int sent; + public int received; +}; + +public class ThreadObserverI : ObserverI , Ice.Instrumentation.ThreadObserver +{ + override public void + reset() + { + lock(this) + { + base.reset(); + states = 0; + } + } + + public void + stateChanged(Ice.Instrumentation.ThreadState o, Ice.Instrumentation.ThreadState n) + { + lock(this) + { + ++states; + } + } + + public int states; +}; + +public class CommunicatorObserverI : Ice.Instrumentation.CommunicatorObserver +{ + public void + setObserverUpdater(Ice.Instrumentation.ObserverUpdater u) + { + lock(this) + { + updater = u; + } + } + + public Ice.Instrumentation.Observer + getConnectionEstablishmentObserver(Ice.Endpoint e, String s) + { + lock(this) + { + if(connectionEstablishmentObserver == null) + { + connectionEstablishmentObserver = new ObserverI(); + connectionEstablishmentObserver.reset(); + } + return connectionEstablishmentObserver; + } + } + + + public Ice.Instrumentation.Observer + getEndpointLookupObserver(Ice.Endpoint e) + { + lock(this) + { + if(endpointLookupObserver == null) + { + endpointLookupObserver = new ObserverI(); + endpointLookupObserver.reset(); + } + return endpointLookupObserver; + } + } + + public Ice.Instrumentation.ConnectionObserver + getConnectionObserver(Ice.ConnectionInfo c, + Ice.Endpoint e, + Ice.Instrumentation.ConnectionState s, + Ice.Instrumentation.ConnectionObserver old) + { + lock(this) + { + Debug.Assert(old == null || old is ConnectionObserverI); + if(connectionObserver == null) + { + connectionObserver = new ConnectionObserverI(); + connectionObserver.reset(); + } + return connectionObserver; + } + } + + public Ice.Instrumentation.ThreadObserver + getThreadObserver(String p, String id, Ice.Instrumentation.ThreadState s, + Ice.Instrumentation.ThreadObserver old) + { + lock(this) + { + Debug.Assert(old == null || old is ThreadObserverI); + if(threadObserver == null) + { + threadObserver = new ThreadObserverI(); + threadObserver.reset(); + } + return threadObserver; + } + } + + public Ice.Instrumentation.InvocationObserver + getInvocationObserver(Ice.ObjectPrx p, String op, Dictionary<String, String> ctx) + { + lock(this) + { + if(invocationObserver == null) + { + invocationObserver = new InvocationObserverI(); + invocationObserver.reset(); + } + return invocationObserver; + } + } + + public Ice.Instrumentation.DispatchObserver + getDispatchObserver(Ice.Current current, int s) + { + lock(this) + { + if(dispatchObserver == null) + { + dispatchObserver = new DispatchObserverI(); + dispatchObserver.reset(); + } + return dispatchObserver; + } + } + + void + reset() + { + lock(this) + { + if(connectionEstablishmentObserver != null) + { + connectionEstablishmentObserver.reset(); + } + if(endpointLookupObserver != null) + { + endpointLookupObserver.reset(); + } + if(connectionObserver != null) + { + connectionObserver.reset(); + } + if(threadObserver != null) + { + threadObserver.reset(); + } + if(invocationObserver != null) + { + invocationObserver.reset(); + } + if(dispatchObserver != null) + { + dispatchObserver.reset(); + } + } + } + + protected Ice.Instrumentation.ObserverUpdater updater; + + public ObserverI connectionEstablishmentObserver; + public ObserverI endpointLookupObserver; + public ConnectionObserverI connectionObserver; + public ThreadObserverI threadObserver; + public InvocationObserverI invocationObserver; + public DispatchObserverI dispatchObserver; +}; + diff --git a/cs/test/Ice/metrics/Makefile b/cs/test/Ice/metrics/Makefile index 9fc55e3d052..5d5b2abf1bc 100644 --- a/cs/test/Ice/metrics/Makefile +++ b/cs/test/Ice/metrics/Makefile @@ -11,7 +11,7 @@ top_srcdir = ../../.. TARGETS = client.exe server.exe serveramd.exe -C_SRCS = AllTests.cs Client.cs ../../TestCommon/TestApp.cs +C_SRCS = AllTests.cs Client.cs InstrumentationI.cs ../../TestCommon/TestApp.cs S_SRCS = MetricsI.cs Server.cs SAMD_SRCS = MetricsAMDI.cs Server.cs diff --git a/cs/test/Ice/metrics/Makefile.mak b/cs/test/Ice/metrics/Makefile.mak index d50449baffc..c488a767153 100644 --- a/cs/test/Ice/metrics/Makefile.mak +++ b/cs/test/Ice/metrics/Makefile.mak @@ -11,7 +11,7 @@ top_srcdir = ..\..\.. TARGETS = client.exe server.exe serveramd.exe -C_SRCS = AllTests.cs Client.cs ..\..\TestCommon\TestApp.cs +C_SRCS = AllTests.cs Client.cs InstrumentationI.cs ..\..\TestCommon\TestApp.cs S_SRCS = MetricsI.cs Server.cs SAMD_SRCS = MetricsAMDI.cs Server.cs diff --git a/distribution/lib/DistUtils.py b/distribution/lib/DistUtils.py index baaca0d5f49..82916337630 100644 --- a/distribution/lib/DistUtils.py +++ b/distribution/lib/DistUtils.py @@ -435,7 +435,7 @@ def fixPermission(dest): def tarArchive(dir, verbose = False, archiveDir = None): dist = os.path.basename(dir) - sys.stdout.write(" creating " + dist + ".tar.gz ...") + sys.stdout.write(" creating " + dist + ".tar.gz... ") sys.stdout.flush() cwd = os.getcwd() @@ -486,7 +486,7 @@ def untarArchive(archive, verbose = False, archiveDir = None): def zipArchive(dir, verbose = False, archiveDir = None): dist = os.path.basename(dir) - sys.stdout.write(" creating " + dist + ".zip ...") + sys.stdout.write(" creating " + dist + ".zip... ") sys.stdout.flush() cwd = os.getcwd() diff --git a/distribution/makedist.py b/distribution/makedist.py index f473cc8765e..c60295cf544 100755 --- a/distribution/makedist.py +++ b/distribution/makedist.py @@ -15,10 +15,53 @@ from DistUtils import * import FixUtil # +# Windows files that will be excluded from Unix source distributions. +# +excludeWindowsFiles = [ \ + "/vsaddin/", + "*.rc", + "*.sln", + "*.csproj", + "*.vbproj", + "*.vcproj", + "*.vcxproj", + "*.vcxproj.filters", + "Make*mak*", + "Make.rules.msvc", + ".depend.mak", + "*.exe.config", + "MSG00001.bin", + "/cpp/test/WinRT", + "/cpp/demo/**/generated", + "/cpp/demo/**/MFC", + "/cpp/**/winrt" + "/cs/**/sl", + "/cs/**/compact", + "/cs/**/cf", +] + +# +# Unix files that will be excluded from Windows source distributions. +# +excludeUnixFiles = [ \ +] +for l in ["/java", "/py", "/php", "/cs", "/cpp/demo"]: + excludeUnixFiles += [ + l + "/**/Makefile", + l + "/**/Make.rules.", + l + "/**/Make.rules.cs", + l + "/**/Make.rules.php", + l + "/**/Make.rules", + l + "/**/Make.rules.Darwin", + l + "/**/Make.rules.Linux", + l + "/**/.depend" + ] + +# # Files from the top-level, cpp, java and cs config directories to include in the demo # source distribution config directory. # -configFiles = [ \ +demoConfigFiles = [ \ "Make.*", \ "common.xml", \ ] @@ -27,7 +70,7 @@ configFiles = [ \ # Files from the top-level certs directory to include in the demo distribution certs # directory. # -certsFiles = [ \ +demoCertsFiles = [ \ "*.jks", \ "*.pem", \ "*.pfx", \ @@ -563,9 +606,9 @@ sys.stdout.flush() copy("ICE_LICENSE", demoDir) copy(os.path.join(distFilesDir, "src", "common", "README.DEMOS"), demoDir) -copyMatchingFiles(os.path.join("certs"), os.path.join(demoDir, "certs"), certsFiles) +copyMatchingFiles(os.path.join("certs"), os.path.join(demoDir, "certs"), demoCertsFiles) for d in ["", "cpp", "java", "cs"]: - copyMatchingFiles(os.path.join(d, "config"), os.path.join(demoDir, "config"), configFiles) + copyMatchingFiles(os.path.join(d, "config"), os.path.join(demoDir, "config"), demoConfigFiles) copy(os.path.join(distFilesDir, "src", "common", "Make.rules"), os.path.join(demoDir, "config"), False) copy(os.path.join(distFilesDir, "src", "common", "Make.rules.cs"), os.path.join(demoDir, "config"), False) @@ -599,7 +642,7 @@ remove(os.path.join(srcDir, 'vb')) # Windows demo distribution copy(os.path.join(winDistFilesDir, "src", "common", "README.DEMOS.txt"), os.path.join(winDemoDir, "README.txt")) -copyMatchingFiles(os.path.join(winSrcDir, "certs"), os.path.join(winDemoDir, "certs"), certsFiles) +copyMatchingFiles(os.path.join(winSrcDir, "certs"), os.path.join(winDemoDir, "certs"), demoCertsFiles) os.mkdir(os.path.join(winDemoDir, "config")) diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java index 82fd56d46a6..21b6525a05a 100644 --- a/java/src/Ice/CommunicatorI.java +++ b/java/src/Ice/CommunicatorI.java @@ -140,9 +140,10 @@ public final class CommunicatorI implements Communicator return _instance.initializationData().stats; } - public Ice.Instrumentation.CommunicatorObserver getObserver() + public Ice.Instrumentation.CommunicatorObserver + getObserver() { - return _instance.initializationData().observer; + return _instance.getObserver(); } public RouterPrx diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 32bf6308d05..bdd49e86ab7 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -257,11 +257,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return; } - assert(_instance.initializationData().observer != null); - _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), - _endpoint, - toConnectionState(_state), - _observer); + assert(_instance.getObserver() != null); + _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), + _endpoint, + toConnectionState(_state), + _observer); if(_observer != null) { _observer.attach(); @@ -1831,16 +1831,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - if(_instance.initializationData().observer != null) + if(_instance.getObserver() != null) { Ice.Instrumentation.ConnectionState oldState = toConnectionState(_state); Ice.Instrumentation.ConnectionState newState = toConnectionState(state); if(oldState != newState) { - _observer = _instance.initializationData().observer.getConnectionObserver(initConnectionInfo(), - _endpoint, - newState, - _observer); + _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), + _endpoint, + newState, + _observer); if(_observer != null) { _observer.attach(); diff --git a/java/src/IceInternal/CommunicatorObserverI.java b/java/src/IceInternal/CommunicatorObserverI.java index 056ef2cd007..78c44da0905 100644 --- a/java/src/IceInternal/CommunicatorObserverI.java +++ b/java/src/IceInternal/CommunicatorObserverI.java @@ -583,16 +583,27 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb public CommunicatorObserverI(IceInternal.MetricsAdminI metrics) { - _metrics = metrics; + this(metrics, null); + } - _connections = new ObserverFactory<ConnectionMetrics, ConnectionObserverI>(metrics, "Connection", - ConnectionMetrics.class); - _dispatch = new ObserverFactory<DispatchMetrics, DispatchObserverI>(metrics, "Dispatch", DispatchMetrics.class); - _invocations = new ObserverFactory<InvocationMetrics, InvocationObserverI>(metrics, "Invocation", - InvocationMetrics.class); - _threads = new ObserverFactory<ThreadMetrics, ThreadObserverI>(metrics, "Thread", ThreadMetrics.class); - _connects = new ObserverFactory<Metrics, ObserverI>(metrics, "ConnectionEstablishment", Metrics.class); - _endpointLookups = new ObserverFactory<Metrics, ObserverI>(metrics, "EndpointLookup", Metrics.class); + public + CommunicatorObserverI(IceInternal.MetricsAdminI metrics, Ice.Instrumentation.CommunicatorObserver delegate) + { + _metrics = metrics; + _delegate = delegate; + + _connections = new ObserverFactoryWithDelegate<ConnectionMetrics, ConnectionObserverI, + Ice.Instrumentation.ConnectionObserver>(metrics, "Connection", ConnectionMetrics.class); + _dispatch = new ObserverFactoryWithDelegate<DispatchMetrics, DispatchObserverI, + Ice.Instrumentation.DispatchObserver>(metrics, "Dispatch", DispatchMetrics.class); + _invocations = new ObserverFactoryWithDelegate<InvocationMetrics, InvocationObserverI, + Ice.Instrumentation.InvocationObserver>(metrics, "Invocation", InvocationMetrics.class); + _threads = new ObserverFactoryWithDelegate<ThreadMetrics, ThreadObserverI, + Ice.Instrumentation.ThreadObserver>(metrics, "Thread", ThreadMetrics.class); + _connects = new ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer>(metrics, "ConnectionEstablishment", Metrics.class); + _endpointLookups = new ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer>(metrics, "EndpointLookup", Metrics.class); try { @@ -612,11 +623,17 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb { try { - return _connects.getObserver(new EndpointHelper(endpt, connector), ObserverI.class); + Ice.Instrumentation.Observer delegate = null; + if(_delegate != null) + { + delegate = _delegate.getConnectionEstablishmentObserver(endpt, connector); + } + return _connects.getObserver(new EndpointHelper(endpt, connector), ObserverWithDelegateI.class, + delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } return null; @@ -629,11 +646,16 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb { try { - return _endpointLookups.getObserver(new EndpointHelper(endpt), ObserverI.class); + Ice.Instrumentation.Observer delegate = null; + if(_delegate != null) + { + delegate = _delegate.getEndpointLookupObserver(endpt); + } + return _endpointLookups.getObserver(new EndpointHelper(endpt), ObserverWithDelegateI.class, delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } @@ -642,34 +664,47 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb public Ice.Instrumentation.ConnectionObserver getConnectionObserver(Ice.ConnectionInfo c, Ice.Endpoint e, Ice.Instrumentation.ConnectionState s, - Ice.Instrumentation.ConnectionObserver o) + Ice.Instrumentation.ConnectionObserver observer) { if(_connections.isEnabled()) { try { - return _connections.getObserver(new ConnectionHelper(c, e, s), o, ConnectionObserverI.class); + Ice.Instrumentation.ConnectionObserver delegate = null; + ConnectionObserverI o = observer instanceof ConnectionObserverI ? (ConnectionObserverI)observer : null; + if(_delegate != null) + { + delegate = _delegate.getConnectionObserver(c, e, s, o != null ? o.getDelegate() : observer); + } + return _connections.getObserver(new ConnectionHelper(c, e, s), o, ConnectionObserverI.class, delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } return null; } public Ice.Instrumentation.ThreadObserver - getThreadObserver(String parent, String id, Ice.Instrumentation.ThreadState s, Ice.Instrumentation.ThreadObserver o) + getThreadObserver(String parent, String id, Ice.Instrumentation.ThreadState s, + Ice.Instrumentation.ThreadObserver observer) { if(_threads.isEnabled()) { try { - return _threads.getObserver(new ThreadHelper(parent, id, s), o, ThreadObserverI.class); + Ice.Instrumentation.ThreadObserver delegate = null; + ThreadObserverI o = observer instanceof ThreadObserverI ? (ThreadObserverI)observer : null; + if(_delegate != null) + { + delegate = _delegate.getThreadObserver(parent, id, s, o != null ? o.getDelegate() : observer); + } + return _threads.getObserver(new ThreadHelper(parent, id, s), o, ThreadObserverI.class, delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } return null; @@ -682,11 +717,18 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb { try { - return _invocations.getObserver(new InvocationHelper(prx, operation, ctx), InvocationObserverI.class); + Ice.Instrumentation.InvocationObserver delegate = null; + if(_delegate != null) + { + delegate = _delegate.getInvocationObserver(prx, operation, ctx); + } + return _invocations.getObserver(new InvocationHelper(prx, operation, ctx), + InvocationObserverI.class, + delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } return null; @@ -699,11 +741,16 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb { try { - return _dispatch.getObserver(new DispatchHelper(c, size), DispatchObserverI.class); + Ice.Instrumentation.DispatchObserver delegate = null; + if(_delegate != null) + { + delegate = _delegate.getDispatchObserver(c, size); + } + return _dispatch.getObserver(new DispatchHelper(c, size), DispatchObserverI.class, delegate); } catch(Exception ex) { - _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + ex); + _metrics.getLogger().error("unexpected exception trying to obtain observer:\n" + Ex.toString(ex)); } } return null; @@ -726,6 +773,11 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb updater.updateThreadObservers(); } }); + + if(_delegate != null) + { + _delegate.setObserverUpdater(updater); + } } public IceInternal.MetricsAdminI getMetricsAdmin() @@ -734,10 +786,17 @@ public class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorOb } final private IceInternal.MetricsAdminI _metrics; - final private ObserverFactory<ConnectionMetrics, ConnectionObserverI> _connections; - final private ObserverFactory<DispatchMetrics, DispatchObserverI> _dispatch; - final private ObserverFactory<InvocationMetrics, InvocationObserverI> _invocations; - final private ObserverFactory<ThreadMetrics, ThreadObserverI> _threads; - final private ObserverFactory<Metrics, ObserverI> _connects; - final private ObserverFactory<Metrics, ObserverI> _endpointLookups; + final private Ice.Instrumentation.CommunicatorObserver _delegate; + final private ObserverFactoryWithDelegate<ConnectionMetrics, ConnectionObserverI, + Ice.Instrumentation.ConnectionObserver> _connections; + final private ObserverFactoryWithDelegate<DispatchMetrics, DispatchObserverI, + Ice.Instrumentation.DispatchObserver> _dispatch; + final private ObserverFactoryWithDelegate<InvocationMetrics, InvocationObserverI, + Ice.Instrumentation.InvocationObserver> _invocations; + final private ObserverFactoryWithDelegate<ThreadMetrics, ThreadObserverI, + Ice.Instrumentation.ThreadObserver> _threads; + final private ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer> _connects; + final private ObserverFactoryWithDelegate<Metrics, ObserverWithDelegateI, + Ice.Instrumentation.Observer> _endpointLookups; } diff --git a/java/src/IceInternal/ConnectionObserverI.java b/java/src/IceInternal/ConnectionObserverI.java index 847b0216ec8..9aa68be1280 100644 --- a/java/src/IceInternal/ConnectionObserverI.java +++ b/java/src/IceInternal/ConnectionObserverI.java @@ -9,7 +9,8 @@ package IceInternal; -public class ConnectionObserverI extends IceMX.Observer<IceMX.ConnectionMetrics> +public class ConnectionObserverI + extends IceMX.ObserverWithDelegate<IceMX.ConnectionMetrics, Ice.Instrumentation.ConnectionObserver> implements Ice.Instrumentation.ConnectionObserver { public void @@ -17,6 +18,10 @@ public class ConnectionObserverI extends IceMX.Observer<IceMX.ConnectionMetrics> { _sentBytes = num; forEach(_sentBytesUpdate); + if(_delegate != null) + { + _delegate.sentBytes(num); + } } public void @@ -24,6 +29,10 @@ public class ConnectionObserverI extends IceMX.Observer<IceMX.ConnectionMetrics> { _receivedBytes = num; forEach(_receivedBytesUpdate); + if(_delegate != null) + { + _delegate.receivedBytes(num); + } } private MetricsUpdate<IceMX.ConnectionMetrics> _sentBytesUpdate = new MetricsUpdate<IceMX.ConnectionMetrics>() diff --git a/java/src/IceInternal/DispatchObserverI.java b/java/src/IceInternal/DispatchObserverI.java index 97dadc19d0d..c5366c4ac69 100644 --- a/java/src/IceInternal/DispatchObserverI.java +++ b/java/src/IceInternal/DispatchObserverI.java @@ -9,13 +9,18 @@ package IceInternal; -public class DispatchObserverI extends IceMX.Observer<IceMX.DispatchMetrics> +public class DispatchObserverI + extends IceMX.ObserverWithDelegate<IceMX.DispatchMetrics, Ice.Instrumentation.DispatchObserver> implements Ice.Instrumentation.DispatchObserver { public void userException() { forEach(_userException); + if(_delegate != null) + { + _delegate.userException(); + } } public void @@ -29,6 +34,10 @@ public class DispatchObserverI extends IceMX.Observer<IceMX.DispatchMetrics> v.replySize += size; } }); + if(_delegate != null) + { + _delegate.reply(size); + } } final MetricsUpdate<IceMX.DispatchMetrics> _userException = new MetricsUpdate<IceMX.DispatchMetrics>() diff --git a/java/src/IceInternal/EndpointHostResolver.java b/java/src/IceInternal/EndpointHostResolver.java index 25f144c66a5..57ed431299d 100644 --- a/java/src/IceInternal/EndpointHostResolver.java +++ b/java/src/IceInternal/EndpointHostResolver.java @@ -52,7 +52,7 @@ public class EndpointHostResolver } } - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); Ice.Instrumentation.Observer observer = null; if(obsv != null) { @@ -110,7 +110,7 @@ public class EndpointHostResolver entry.endpoint = endpoint; entry.callback = callback; - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { entry.observer = obsv.getEndpointLookupObserver(endpoint); @@ -240,7 +240,7 @@ public class EndpointHostResolver synchronized public void updateObserver() { - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { _observer = obsv.getThreadObserver("Communicator", diff --git a/java/src/IceInternal/Incoming.java b/java/src/IceInternal/Incoming.java index cccdfc43a33..5b76d223f8c 100644 --- a/java/src/IceInternal/Incoming.java +++ b/java/src/IceInternal/Incoming.java @@ -123,7 +123,7 @@ final public class Incoming extends IncomingBase implements Ice.Request _current.ctx.put(first, second); } - CommunicatorObserver obsv = _instance.initializationData().observer; + CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { // Read the parameter encapsulation size. diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index 03fbc58c434..a837018cd6a 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -531,6 +531,13 @@ public final class Instance return result; } + public Ice.Instrumentation.CommunicatorObserver + getObserver() + { + return _observer; // Immutable + } + + public synchronized void setDefaultLocator(Ice.LocatorPrx locator) { @@ -817,18 +824,20 @@ public final class Instance // Setup the communicator observer only if the user didn't already set an // Ice observer resolver and if the admininistrative endpoints are set. // - if(_initData.observer == null && - (_adminFacetFilter.isEmpty() || _adminFacetFilter.contains("Metrics")) && + if((_adminFacetFilter.isEmpty() || _adminFacetFilter.contains("Metrics")) && _initData.properties.getProperty("Ice.Admin.Endpoints").length() > 0) { - CommunicatorObserverI observer = new CommunicatorObserverI(admin); - _initData.observer = observer; + _observer = new CommunicatorObserverI(admin, _initData.observer); // // Make sure the admin plugin receives property updates. // props.addUpdateCallback(admin); } + else + { + _observer = _initData.observer; + } } catch(Ice.LocalException ex) { @@ -882,9 +891,9 @@ public final class Instance // // Set observer updater // - if(_initData.observer != null) + if(_observer != null) { - _initData.observer.setObserverUpdater(new ObserverUpdaterI(this)); + _observer.setObserverUpdater(new ObserverUpdaterI(this)); } // @@ -1235,6 +1244,7 @@ public final class Instance private final int _clientACM; // Immutable, not reset by destroy(). private final int _serverACM; // Immutable, not reset by destroy(). private final Ice.ImplicitContextI _implicitContext; + private final Ice.Instrumentation.CommunicatorObserver _observer; private RouterManager _routerManager; private LocatorManager _locatorManager; private ReferenceFactory _referenceFactory; diff --git a/java/src/IceInternal/InvocationObserverI.java b/java/src/IceInternal/InvocationObserverI.java index c36b8a581c3..dd98db9df23 100644 --- a/java/src/IceInternal/InvocationObserverI.java +++ b/java/src/IceInternal/InvocationObserverI.java @@ -11,7 +11,8 @@ package IceInternal; import IceMX.*; -public class InvocationObserverI extends IceMX.Observer<IceMX.InvocationMetrics> +public class InvocationObserverI + extends IceMX.ObserverWithDelegate<IceMX.InvocationMetrics, Ice.Instrumentation.InvocationObserver> implements Ice.Instrumentation.InvocationObserver { static public final class RemoteInvocationHelper extends MetricsHelper<RemoteMetrics> @@ -117,21 +118,35 @@ public class InvocationObserverI extends IceMX.Observer<IceMX.InvocationMetrics> userException() { forEach(_userException); + if(_delegate != null) + { + _delegate.userException(); + } } public void retried() { forEach(_incrementRetry); + if(_delegate != null) + { + _delegate.retried(); + } } public Ice.Instrumentation.RemoteObserver getRemoteObserver(Ice.ConnectionInfo con, Ice.Endpoint edpt, int requestId, int sz) { + Ice.Instrumentation.RemoteObserver delegate = null; + if(_delegate != null) + { + delegate = _delegate.getRemoteObserver(con, edpt, requestId, sz); + } return (Ice.Instrumentation.RemoteObserver)getObserver("Remote", new RemoteInvocationHelper(con, edpt, requestId, sz), RemoteMetrics.class, - RemoteObserverI.class); + RemoteObserverI.class, + delegate); } final MetricsUpdate<InvocationMetrics> _incrementRetry = new MetricsUpdate<InvocationMetrics>() diff --git a/java/src/IceInternal/ObserverHelper.java b/java/src/IceInternal/ObserverHelper.java index d2b2be71177..6e1514baa8f 100644 --- a/java/src/IceInternal/ObserverHelper.java +++ b/java/src/IceInternal/ObserverHelper.java @@ -17,7 +17,7 @@ public final class ObserverHelper static public InvocationObserver get(Instance instance, String op) { - CommunicatorObserver obsv = instance.initializationData().observer; + CommunicatorObserver obsv = instance.getObserver(); if(obsv != null) { InvocationObserver observer = obsv.getInvocationObserver(null, op, _emptyContext); @@ -39,8 +39,7 @@ public final class ObserverHelper static public InvocationObserver get(Ice.ObjectPrx proxy, String op, java.util.Map<String, String> context) { - CommunicatorObserver obsv = - ((Ice.ObjectPrxHelperBase)proxy).__reference().getInstance().initializationData().observer; + CommunicatorObserver obsv = ((Ice.ObjectPrxHelperBase)proxy).__reference().getInstance().getObserver(); if(obsv != null) { InvocationObserver observer; diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index 82e7c1adfe8..3bf55802f61 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -217,7 +217,7 @@ public final class OutgoingConnectionFactory // Try to establish the connection to the connectors. // DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); java.util.Iterator<ConnectorInfo> q = connectors.iterator(); ConnectorInfo ci = null; while(q.hasNext()) @@ -1212,7 +1212,7 @@ public final class OutgoingConnectionFactory assert(_iter.hasNext()); _current = _iter.next(); - Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.getObserver(); if(obsv != null) { _observer = obsv.getConnectionEstablishmentObserver(_current.endpoint, diff --git a/java/src/IceInternal/RemoteObserverI.java b/java/src/IceInternal/RemoteObserverI.java index 4ebc62e51d2..eb5b470a765 100644 --- a/java/src/IceInternal/RemoteObserverI.java +++ b/java/src/IceInternal/RemoteObserverI.java @@ -9,7 +9,8 @@ package IceInternal; -public class RemoteObserverI extends IceMX.Observer<IceMX.RemoteMetrics> +public class RemoteObserverI + extends IceMX.ObserverWithDelegate<IceMX.RemoteMetrics, Ice.Instrumentation.RemoteObserver> implements Ice.Instrumentation.RemoteObserver { public void @@ -23,5 +24,9 @@ public class RemoteObserverI extends IceMX.Observer<IceMX.RemoteMetrics> v.replySize += size; } }); + if(_delegate != null) + { + _delegate.reply(size); + } } }
\ No newline at end of file diff --git a/java/src/IceInternal/ThreadObserverI.java b/java/src/IceInternal/ThreadObserverI.java index 7086618b4f7..f92784b12c1 100644 --- a/java/src/IceInternal/ThreadObserverI.java +++ b/java/src/IceInternal/ThreadObserverI.java @@ -9,7 +9,9 @@ package IceInternal; -public class ThreadObserverI extends IceMX.Observer<IceMX.ThreadMetrics> implements Ice.Instrumentation.ThreadObserver +public class ThreadObserverI + extends IceMX.ObserverWithDelegate<IceMX.ThreadMetrics, Ice.Instrumentation.ThreadObserver> + implements Ice.Instrumentation.ThreadObserver { public void stateChanged(final Ice.Instrumentation.ThreadState oldState, final Ice.Instrumentation.ThreadState newState) @@ -17,6 +19,10 @@ public class ThreadObserverI extends IceMX.Observer<IceMX.ThreadMetrics> impleme _oldState = oldState; _newState = newState; forEach(_threadStateUpdate); + if(_delegate != null) + { + _delegate.stateChanged(oldState, newState); + } } private MetricsUpdate<IceMX.ThreadMetrics> _threadStateUpdate = new MetricsUpdate<IceMX.ThreadMetrics>() diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 19783697d1d..b7bc1b83c50 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -622,7 +622,7 @@ public final class ThreadPool updateObserver() { // Must be called with the thread pool mutex locked - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; + Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) { _observer = obsv.getThreadObserver(_prefix, _name, _state, _observer); diff --git a/java/src/IceMX/ObserverFactory.java b/java/src/IceMX/ObserverFactory.java index 9a99e7a86e1..3881877c427 100644 --- a/java/src/IceMX/ObserverFactory.java +++ b/java/src/IceMX/ObserverFactory.java @@ -29,14 +29,6 @@ public class ObserverFactory<T extends Metrics, O extends Observer<T>> }); } - public - ObserverFactory(String name, Class<T> cl) - { - _name = name; - _metrics = null; - _class = cl; - } - public void destroy() { @@ -56,7 +48,14 @@ public class ObserverFactory<T extends Metrics, O extends Observer<T>> public synchronized O getObserver(MetricsHelper<T> helper, Object observer, Class<O> cl) { - O old = (O)observer; + O old = null; + try + { + old = (O)observer; + } + catch(ClassCastException ex) + { + } java.util.List<MetricsMap<T>.Entry> metricsObjects = null; for(MetricsMap<T> m : _maps) { diff --git a/java/src/IceMX/ObserverFactoryWithDelegate.java b/java/src/IceMX/ObserverFactoryWithDelegate.java new file mode 100644 index 00000000000..e900d09df40 --- /dev/null +++ b/java/src/IceMX/ObserverFactoryWithDelegate.java @@ -0,0 +1,48 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +public class ObserverFactoryWithDelegate<T extends Metrics, + OImpl extends ObserverWithDelegate<T, O>, + O extends Ice.Instrumentation.Observer> + extends ObserverFactory<T, OImpl> +{ + public + ObserverFactoryWithDelegate(IceInternal.MetricsAdminI metrics, String name, Class<T> cl) + { + super(metrics, name, cl); + } + + @SuppressWarnings("unchecked") + public O + getObserver(MetricsHelper<T> helper, Class<OImpl> cl, O delegate) + { + OImpl o = super.getObserver(helper, cl); + if(o != null) + { + o.setDelegate(delegate); + return (O)o; + } + return delegate; + } + + @SuppressWarnings("unchecked") + public O + getObserver(MetricsHelper<T> helper, Object observer, Class<OImpl> cl, O delegate) + { + OImpl o = super.getObserver(helper, observer, cl); + if(o != null) + { + o.setDelegate(delegate); + return (O)o; + } + return delegate; + } +}; diff --git a/java/src/IceMX/ObserverWithDelegate.java b/java/src/IceMX/ObserverWithDelegate.java new file mode 100644 index 00000000000..a699533e282 --- /dev/null +++ b/java/src/IceMX/ObserverWithDelegate.java @@ -0,0 +1,71 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceMX; + +public class ObserverWithDelegate<T extends Metrics, O extends Ice.Instrumentation.Observer> extends Observer<T> +{ + public void + attach() + { + super.attach(); + if(_delegate != null) + { + _delegate.attach(); + } + } + + public void + detach() + { + super.detach(); + if(_delegate != null) + { + _delegate.detach(); + } + } + + public void + failed(String exceptionName) + { + super.failed(exceptionName); + if(_delegate != null) + { + _delegate.failed(exceptionName); + } + } + + public O + getDelegate() + { + return _delegate; + } + + public void + setDelegate(O del) + { + _delegate = del; + } + + @SuppressWarnings("unchecked") + public <S extends Metrics, ObserverImpl extends ObserverWithDelegate<S, Obs>, + Obs extends Ice.Instrumentation.Observer> Obs + getObserver(String mapName, MetricsHelper<S> helper, Class<S> mcl, Class<ObserverImpl> ocl, Obs delegate) + { + ObserverImpl obsv = super.getObserver(mapName, helper, mcl, ocl); + if(obsv != null) + { + obsv.setDelegate(delegate); + return (Obs)obsv; + } + return delegate; + } + + protected O _delegate; +}; diff --git a/java/src/IceMX/ObserverI.java b/java/src/IceMX/ObserverWithDelegateI.java index 23baf30408d..ab0c72c48ee 100644 --- a/java/src/IceMX/ObserverI.java +++ b/java/src/IceMX/ObserverWithDelegateI.java @@ -9,6 +9,6 @@ package IceMX; -public class ObserverI extends Observer<Metrics> +public class ObserverWithDelegateI extends ObserverWithDelegate<Metrics, Ice.Instrumentation.Observer> { }; diff --git a/java/test/Ice/metrics/AllTests.java b/java/test/Ice/metrics/AllTests.java index 6c36493f621..35c874eaf8f 100644 --- a/java/test/Ice/metrics/AllTests.java +++ b/java/test/Ice/metrics/AllTests.java @@ -397,7 +397,7 @@ public class AllTests } static MetricsPrx - allTests(Ice.Communicator communicator, PrintWriter out) + allTests(Ice.Communicator communicator, PrintWriter out, CommunicatorObserverI obsv) throws IceMX.UnknownMetricsView { MetricsPrx metrics = MetricsPrxHelper.checkedCast(communicator.stringToProxy("metrics:default -p 12010")); @@ -1076,6 +1076,41 @@ public class AllTests out.println("ok"); + out.print("testing instrumentation observer delegate... "); + out.flush(); + + test(obsv.threadObserver.total > 0); + test(obsv.connectionObserver.total > 0); + test(obsv.connectionEstablishmentObserver.total > 0); + test(obsv.endpointLookupObserver.total > 0); + test(obsv.dispatchObserver.total > 0); + test(obsv.invocationObserver.total > 0); + test(obsv.invocationObserver.remoteObserver.total > 0); + + test(obsv.threadObserver.current > 0); + test(obsv.connectionObserver.current > 0); + test(obsv.connectionEstablishmentObserver.current == 0); + test(obsv.endpointLookupObserver.current == 0); + test(obsv.dispatchObserver.current == 0); + test(obsv.invocationObserver.current == 0); + test(obsv.invocationObserver.remoteObserver.current == 0); + + test(obsv.threadObserver.failedCount == 0); + test(obsv.connectionObserver.failedCount > 0); + test(obsv.connectionEstablishmentObserver.failedCount > 0); + test(obsv.endpointLookupObserver.failedCount > 0); + //test(obsv.dispatchObserver.failedCount > 0); + test(obsv.invocationObserver.failedCount > 0); + test(obsv.invocationObserver.remoteObserver.failedCount > 0); + + test(obsv.threadObserver.states > 0); + test(obsv.connectionObserver.received > 0 && obsv.connectionObserver.sent > 0); + //test(obsv.dispatchObserver.userExceptionCount > 0); + test(obsv.invocationObserver.userExceptionCount > 0 && obsv.invocationObserver.retriedCount > 0); + test(obsv.invocationObserver.remoteObserver.replySize > 0); + + out.println("ok"); + return metrics; } } diff --git a/java/test/Ice/metrics/Client.java b/java/test/Ice/metrics/Client.java index 543819d1672..7ed4770bf06 100644 --- a/java/test/Ice/metrics/Client.java +++ b/java/test/Ice/metrics/Client.java @@ -18,7 +18,7 @@ public class Client extends test.Util.Application Ice.Communicator communicator = communicator(); try { - MetricsPrx metrics = AllTests.allTests(communicator, getWriter()); + MetricsPrx metrics = AllTests.allTests(communicator, getWriter(), _observer); metrics.shutdown(); } catch(Ice.UserException ex) @@ -40,6 +40,7 @@ public class Client extends test.Util.Application initData.properties.setProperty("Ice.Admin.DelayCreation", "1"); initData.properties.setProperty("Ice.Warn.Connections", "0"); initData.properties.setProperty("Ice.MessageSizeMax", "50000"); + initData.observer = _observer; return initData; } @@ -50,4 +51,6 @@ public class Client extends test.Util.Application System.gc(); System.exit(result); } + + private CommunicatorObserverI _observer = new CommunicatorObserverI(); } diff --git a/java/test/Ice/metrics/CommunicatorObserverI.java b/java/test/Ice/metrics/CommunicatorObserverI.java new file mode 100644 index 00000000000..2973880f884 --- /dev/null +++ b/java/test/Ice/metrics/CommunicatorObserverI.java @@ -0,0 +1,140 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class CommunicatorObserverI implements Ice.Instrumentation.CommunicatorObserver +{ + private static void + test(boolean b) + { + if(!b) + { + throw new RuntimeException(); + } + } + + public void + setObserverUpdater(Ice.Instrumentation.ObserverUpdater u) + { + updater = u; + } + + synchronized public Ice.Instrumentation.Observer + getConnectionEstablishmentObserver(Ice.Endpoint e, String s) + { + if(connectionEstablishmentObserver == null) + { + connectionEstablishmentObserver = new ObserverI(); + connectionEstablishmentObserver.reset(); + } + return connectionEstablishmentObserver; + } + + + synchronized public Ice.Instrumentation.Observer + getEndpointLookupObserver(Ice.Endpoint e) + { + if(endpointLookupObserver == null) + { + endpointLookupObserver = new ObserverI(); + endpointLookupObserver.reset(); + } + return endpointLookupObserver; + } + + synchronized public Ice.Instrumentation.ConnectionObserver + getConnectionObserver(Ice.ConnectionInfo c, + Ice.Endpoint e, + Ice.Instrumentation.ConnectionState s, + Ice.Instrumentation.ConnectionObserver old) + { + test(old == null || old instanceof ConnectionObserverI); + if(connectionObserver == null) + { + connectionObserver = new ConnectionObserverI(); + connectionObserver.reset(); + } + return connectionObserver; + } + + synchronized public Ice.Instrumentation.ThreadObserver + getThreadObserver(String p, String id, Ice.Instrumentation.ThreadState s, + Ice.Instrumentation.ThreadObserver old) + { + test(old == null || old instanceof ThreadObserverI); + if(threadObserver == null) + { + threadObserver = new ThreadObserverI(); + threadObserver.reset(); + } + return threadObserver; + } + + synchronized public Ice.Instrumentation.InvocationObserver + getInvocationObserver(Ice.ObjectPrx p, String op, java.util.Map<String, String> ctx) + { + if(invocationObserver == null) + { + invocationObserver = new InvocationObserverI(); + invocationObserver.reset(); + } + return invocationObserver; + } + + synchronized public Ice.Instrumentation.DispatchObserver + getDispatchObserver(Ice.Current current, int s) + { + if(dispatchObserver == null) + { + dispatchObserver = new DispatchObserverI(); + dispatchObserver.reset(); + } + return dispatchObserver; + } + + synchronized void + reset() + { + if(connectionEstablishmentObserver != null) + { + connectionEstablishmentObserver.reset(); + } + if(endpointLookupObserver != null) + { + endpointLookupObserver.reset(); + } + if(connectionObserver != null) + { + connectionObserver.reset(); + } + if(threadObserver != null) + { + threadObserver.reset(); + } + if(invocationObserver != null) + { + invocationObserver.reset(); + } + if(dispatchObserver != null) + { + dispatchObserver.reset(); + } + } + + Ice.Instrumentation.ObserverUpdater updater; + + ObserverI connectionEstablishmentObserver; + ObserverI endpointLookupObserver; + ConnectionObserverI connectionObserver; + ThreadObserverI threadObserver; + InvocationObserverI invocationObserver; + DispatchObserverI dispatchObserver; +}; + diff --git a/java/test/Ice/metrics/ConnectionObserverI.java b/java/test/Ice/metrics/ConnectionObserverI.java new file mode 100644 index 00000000000..721d527d748 --- /dev/null +++ b/java/test/Ice/metrics/ConnectionObserverI.java @@ -0,0 +1,37 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class ConnectionObserverI extends ObserverI implements Ice.Instrumentation.ConnectionObserver +{ + public synchronized void + reset() + { + super.reset(); + received = 0; + sent = 0; + } + + public synchronized void + sentBytes(int s) + { + sent += s; + } + + public synchronized void + receivedBytes(int s) + { + received += s; + } + + int sent; + int received; +}; + diff --git a/java/test/Ice/metrics/DispatchObserverI.java b/java/test/Ice/metrics/DispatchObserverI.java new file mode 100644 index 00000000000..1a29c236bd9 --- /dev/null +++ b/java/test/Ice/metrics/DispatchObserverI.java @@ -0,0 +1,37 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class DispatchObserverI extends ObserverI implements Ice.Instrumentation.DispatchObserver +{ + public synchronized void + reset() + { + super.reset(); + userExceptionCount = 0; + replySize = 0; + } + + public synchronized void + userException() + { + ++userExceptionCount; + } + + public synchronized void + reply(int s) + { + replySize += s; + } + + int userExceptionCount; + int replySize; +}; + diff --git a/java/test/Ice/metrics/InvocationObserverI.java b/java/test/Ice/metrics/InvocationObserverI.java new file mode 100644 index 00000000000..447ea094c5b --- /dev/null +++ b/java/test/Ice/metrics/InvocationObserverI.java @@ -0,0 +1,53 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class InvocationObserverI extends ObserverI implements Ice.Instrumentation.InvocationObserver +{ + public synchronized void + reset() + { + super.reset(); + retriedCount = 0; + userExceptionCount = 0; + if(remoteObserver != null) + { + remoteObserver.reset(); + } + } + + public synchronized void + retried() + { + ++retriedCount; + } + + public synchronized void + userException() + { + ++userExceptionCount; + } + + public synchronized Ice.Instrumentation.RemoteObserver + getRemoteObserver(Ice.ConnectionInfo c, Ice.Endpoint e, int a, int b) + { + if(remoteObserver == null) + { + remoteObserver = new RemoteObserverI(); + remoteObserver.reset(); + } + return remoteObserver; + } + + int userExceptionCount; + int retriedCount; + + RemoteObserverI remoteObserver = null; +}; diff --git a/java/test/Ice/metrics/ObserverI.java b/java/test/Ice/metrics/ObserverI.java new file mode 100644 index 00000000000..46ec62a275b --- /dev/null +++ b/java/test/Ice/metrics/ObserverI.java @@ -0,0 +1,42 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class ObserverI implements Ice.Instrumentation.Observer +{ + synchronized public void + reset() + { + total = 0; + current = 0; + failedCount = 0; + } + + synchronized public void + attach() + { + ++total; + ++current; + } + synchronized public void + detach() + { + --current; + } + synchronized public void + failed(String s) + { + ++failedCount; + } + + int total; + int current; + int failedCount; +}; diff --git a/java/test/Ice/metrics/RemoveObserverI.java b/java/test/Ice/metrics/RemoveObserverI.java new file mode 100644 index 00000000000..82ecf2c9016 --- /dev/null +++ b/java/test/Ice/metrics/RemoveObserverI.java @@ -0,0 +1,29 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class RemoteObserverI extends ObserverI implements Ice.Instrumentation.RemoteObserver +{ + public synchronized void + reset() + { + super.reset(); + replySize = 0; + } + + public synchronized void + reply(int s) + { + replySize += s; + } + + int replySize; +}; + diff --git a/java/test/Ice/metrics/ThreadObserverI.java b/java/test/Ice/metrics/ThreadObserverI.java new file mode 100644 index 00000000000..4221039ae22 --- /dev/null +++ b/java/test/Ice/metrics/ThreadObserverI.java @@ -0,0 +1,29 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.metrics; + +class ThreadObserverI extends ObserverI implements Ice.Instrumentation.ThreadObserver +{ + public synchronized void + reset() + { + super.reset(); + states = 0; + } + + public synchronized void + stateChanged(Ice.Instrumentation.ThreadState o, Ice.Instrumentation.ThreadState n) + { + ++states; + } + + int states; +}; + |