diff options
Diffstat (limited to 'cpp/src/Ice/Instance.cpp')
-rw-r--r-- | cpp/src/Ice/Instance.cpp | 145 |
1 files changed, 109 insertions, 36 deletions
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index 6e7734979a4..0247b858704 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -37,13 +37,13 @@ #include <IceUtil/StringUtil.h> #include <Ice/PropertiesI.h> #include <Ice/Communicator.h> -#include <Ice/MetricsAdminI.h> #include <Ice/InstrumentationI.h> #include <Ice/ProtocolInstance.h> #include <Ice/LoggerAdminI.h> #include <IceUtil/UUID.h> #include <IceUtil/Mutex.h> #include <IceUtil/MutexPtrLock.h> +#include <Ice/ObserverHelper.h> #include <stdio.h> #include <list> @@ -186,6 +186,84 @@ private: const InstancePtr _instance; }; + +// +// Timer specialization which supports the thread observer +// +class Timer : public IceUtil::Timer +{ +public: + + Timer(int priority) : IceUtil::Timer(priority) + { + } + + Timer() + { + } + + void updateObserver(const Ice::Instrumentation::CommunicatorObserverPtr&); + +private: + + virtual void runTimerTask(const IceUtil::TimerTaskPtr&); + + IceUtil::Mutex _mutex; + volatile bool _hasObserver; + ObserverHelperT<Ice::Instrumentation::ThreadObserver> _observer; +}; + +} + +void +Timer::updateObserver(const Ice::Instrumentation::CommunicatorObserverPtr& obsv) +{ + IceUtil::Mutex::Lock sync(_mutex); + assert(obsv); + _observer.attach(obsv->getThreadObserver("Communicator", + "Ice.Timer", + Ice::Instrumentation::ThreadStateIdle, + _observer.get())); + _hasObserver = _observer.get(); +} + +void +Timer::runTimerTask(const IceUtil::TimerTaskPtr& task) +{ + if(_hasObserver) + { + Ice::Instrumentation::ThreadObserverPtr threadObserver; + { + IceUtil::Mutex::Lock sync(_mutex); + threadObserver = _observer.get(); + } + if(threadObserver) + { + threadObserver->stateChanged(Ice::Instrumentation::ThreadStateIdle, + Ice::Instrumentation::ThreadStateInUseForOther); + } + try + { + task->runTimerTask(); + } + catch(...) + { + if(threadObserver) + { + threadObserver->stateChanged(Ice::Instrumentation::ThreadStateInUseForOther, + Ice::Instrumentation::ThreadStateIdle); + } + } + if(threadObserver) + { + threadObserver->stateChanged(Ice::Instrumentation::ThreadStateInUseForOther, + Ice::Instrumentation::ThreadStateIdle); + } + } + else + { + task->runTimerTask(); + } } IceUtil::Shared* IceInternal::upCast(Instance* p) { return p; } @@ -1394,44 +1472,42 @@ IceInternal::Instance::finishSetup(int& argc, char* argv[], const Ice::Communica } PropertiesAdminIPtr propsAdmin; - if(_adminEnabled) { _adminFacets.insert(FacetMap::value_type("Process", new ProcessI(communicator))); propsAdmin = new PropertiesAdminI("Properties", _initData.properties, _initData.logger); _adminFacets.insert(FacetMap::value_type("Properties", propsAdmin)); - - _metricsAdmin = new MetricsAdminI(_initData.properties, _initData.logger); - _adminFacets.insert(FacetMap::value_type("Metrics", _metricsAdmin)); } - + // - // Setup the communicator observer only if the user didn't already set an - // Ice observer resolver and Admin is enabled + // Setup the communicator observer if Admin is enabled and the + // facet isn't filtered or if Ice.Admin.Metrics is enabled. // - if(_adminEnabled && (_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end())) + if((_adminEnabled && (_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end())) || + _initData.properties->getPropertyAsInt("Ice.Admin.Metrics")) { - _observer = new CommunicatorObserverI(_metricsAdmin, _initData.observer); + CommunicatorObserverIPtr observer = new CommunicatorObserverI(_initData); + _initData.observer = observer; + _adminFacets.insert(FacetMap::value_type("Metrics", observer->getFacet())); // - // Make sure the admin plugin receives property updates. + // Make sure the metrics admin facet receives property updates. // - propsAdmin->addUpdateCallback(_metricsAdmin); - } - else - { - _observer = _initData.observer; + if(propsAdmin) + { + propsAdmin->addUpdateCallback(observer->getFacet()); + } } - + // // Set observer updater // - if(_observer) + if(_initData.observer) { - _observer->setObserverUpdater(new ObserverUpdaterI(this)); + _initData.observer->setObserverUpdater(new ObserverUpdaterI(this)); } - + // // Create threads. // @@ -1441,11 +1517,11 @@ IceInternal::Instance::finishSetup(int& argc, char* argv[], const Ice::Communica int priority = _initData.properties->getPropertyAsInt("Ice.ThreadPriority"); if(hasPriority) { - _timer = new IceUtil::Timer(priority); + _timer = new Timer(priority); } else { - _timer = new IceUtil::Timer; + _timer = new Timer; } } catch(const IceUtil::Exception& ex) @@ -1590,21 +1666,14 @@ IceInternal::Instance::destroy() _retryQueue->destroy(); } - if(_metricsAdmin) + if(_initData.observer) { - _metricsAdmin->destroy(); - _metricsAdmin = 0; - - if(_observer) - { - // Break cyclic reference counts. Don't clear _observer, it's immutable. - CommunicatorObserverIPtr observer = CommunicatorObserverIPtr::dynamicCast(_observer); - if(observer) - { - observer->destroy(); - } - _observer->setObserverUpdater(0); // Break cyclic reference count. + CommunicatorObserverIPtr observer = CommunicatorObserverIPtr::dynamicCast(_initData.observer); + if(observer) + { + observer->destroy(); // Break cyclic reference counts. Don't clear _observer, it's immutable. } + _initData.observer->setObserverUpdater(0); // Break cyclic reference count. } Ice::LoggerAdminLoggerPtr logger = Ice::LoggerAdminLoggerPtr::dynamicCast(_initData.logger); @@ -1619,7 +1688,7 @@ IceInternal::Instance::destroy() ThreadPoolPtr serverThreadPool; ThreadPoolPtr clientThreadPool; EndpointHostResolverPtr endpointHostResolver; - IceUtil::TimerPtr timer; + TimerPtr timer; { IceUtil::RecMutex::Lock sync(*this); @@ -1766,6 +1835,10 @@ IceInternal::Instance::updateThreadObservers() { _endpointHostResolver->updateObserver(); } + if(_timer) + { + _timer->updateObserver(_initData.observer); + } } catch(const Ice::CommunicatorDestroyedException&) { |