summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Instance.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Instance.cpp')
-rw-r--r--cpp/src/Ice/Instance.cpp145
1 files changed, 109 insertions, 36 deletions
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index 6e7734979a4..0247b858704 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -37,13 +37,13 @@
#include <IceUtil/StringUtil.h>
#include <Ice/PropertiesI.h>
#include <Ice/Communicator.h>
-#include <Ice/MetricsAdminI.h>
#include <Ice/InstrumentationI.h>
#include <Ice/ProtocolInstance.h>
#include <Ice/LoggerAdminI.h>
#include <IceUtil/UUID.h>
#include <IceUtil/Mutex.h>
#include <IceUtil/MutexPtrLock.h>
+#include <Ice/ObserverHelper.h>
#include <stdio.h>
#include <list>
@@ -186,6 +186,84 @@ private:
const InstancePtr _instance;
};
+
+//
+// Timer specialization which supports the thread observer
+//
+class Timer : public IceUtil::Timer
+{
+public:
+
+ Timer(int priority) : IceUtil::Timer(priority)
+ {
+ }
+
+ Timer()
+ {
+ }
+
+ void updateObserver(const Ice::Instrumentation::CommunicatorObserverPtr&);
+
+private:
+
+ virtual void runTimerTask(const IceUtil::TimerTaskPtr&);
+
+ IceUtil::Mutex _mutex;
+ volatile bool _hasObserver;
+ ObserverHelperT<Ice::Instrumentation::ThreadObserver> _observer;
+};
+
+}
+
+void
+Timer::updateObserver(const Ice::Instrumentation::CommunicatorObserverPtr& obsv)
+{
+ IceUtil::Mutex::Lock sync(_mutex);
+ assert(obsv);
+ _observer.attach(obsv->getThreadObserver("Communicator",
+ "Ice.Timer",
+ Ice::Instrumentation::ThreadStateIdle,
+ _observer.get()));
+ _hasObserver = _observer.get();
+}
+
+void
+Timer::runTimerTask(const IceUtil::TimerTaskPtr& task)
+{
+ if(_hasObserver)
+ {
+ Ice::Instrumentation::ThreadObserverPtr threadObserver;
+ {
+ IceUtil::Mutex::Lock sync(_mutex);
+ threadObserver = _observer.get();
+ }
+ if(threadObserver)
+ {
+ threadObserver->stateChanged(Ice::Instrumentation::ThreadStateIdle,
+ Ice::Instrumentation::ThreadStateInUseForOther);
+ }
+ try
+ {
+ task->runTimerTask();
+ }
+ catch(...)
+ {
+ if(threadObserver)
+ {
+ threadObserver->stateChanged(Ice::Instrumentation::ThreadStateInUseForOther,
+ Ice::Instrumentation::ThreadStateIdle);
+ }
+ }
+ if(threadObserver)
+ {
+ threadObserver->stateChanged(Ice::Instrumentation::ThreadStateInUseForOther,
+ Ice::Instrumentation::ThreadStateIdle);
+ }
+ }
+ else
+ {
+ task->runTimerTask();
+ }
}
IceUtil::Shared* IceInternal::upCast(Instance* p) { return p; }
@@ -1394,44 +1472,42 @@ IceInternal::Instance::finishSetup(int& argc, char* argv[], const Ice::Communica
}
PropertiesAdminIPtr propsAdmin;
-
if(_adminEnabled)
{
_adminFacets.insert(FacetMap::value_type("Process", new ProcessI(communicator)));
propsAdmin = new PropertiesAdminI("Properties", _initData.properties, _initData.logger);
_adminFacets.insert(FacetMap::value_type("Properties", propsAdmin));
-
- _metricsAdmin = new MetricsAdminI(_initData.properties, _initData.logger);
- _adminFacets.insert(FacetMap::value_type("Metrics", _metricsAdmin));
}
-
+
//
- // Setup the communicator observer only if the user didn't already set an
- // Ice observer resolver and Admin is enabled
+ // Setup the communicator observer if Admin is enabled and the
+ // facet isn't filtered or if Ice.Admin.Metrics is enabled.
//
- if(_adminEnabled && (_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end()))
+ if((_adminEnabled && (_adminFacetFilter.empty() || _adminFacetFilter.find("Metrics") != _adminFacetFilter.end())) ||
+ _initData.properties->getPropertyAsInt("Ice.Admin.Metrics"))
{
- _observer = new CommunicatorObserverI(_metricsAdmin, _initData.observer);
+ CommunicatorObserverIPtr observer = new CommunicatorObserverI(_initData);
+ _initData.observer = observer;
+ _adminFacets.insert(FacetMap::value_type("Metrics", observer->getFacet()));
//
- // Make sure the admin plugin receives property updates.
+ // Make sure the metrics admin facet receives property updates.
//
- propsAdmin->addUpdateCallback(_metricsAdmin);
- }
- else
- {
- _observer = _initData.observer;
+ if(propsAdmin)
+ {
+ propsAdmin->addUpdateCallback(observer->getFacet());
+ }
}
-
+
//
// Set observer updater
//
- if(_observer)
+ if(_initData.observer)
{
- _observer->setObserverUpdater(new ObserverUpdaterI(this));
+ _initData.observer->setObserverUpdater(new ObserverUpdaterI(this));
}
-
+
//
// Create threads.
//
@@ -1441,11 +1517,11 @@ IceInternal::Instance::finishSetup(int& argc, char* argv[], const Ice::Communica
int priority = _initData.properties->getPropertyAsInt("Ice.ThreadPriority");
if(hasPriority)
{
- _timer = new IceUtil::Timer(priority);
+ _timer = new Timer(priority);
}
else
{
- _timer = new IceUtil::Timer;
+ _timer = new Timer;
}
}
catch(const IceUtil::Exception& ex)
@@ -1590,21 +1666,14 @@ IceInternal::Instance::destroy()
_retryQueue->destroy();
}
- if(_metricsAdmin)
+ if(_initData.observer)
{
- _metricsAdmin->destroy();
- _metricsAdmin = 0;
-
- if(_observer)
- {
- // Break cyclic reference counts. Don't clear _observer, it's immutable.
- CommunicatorObserverIPtr observer = CommunicatorObserverIPtr::dynamicCast(_observer);
- if(observer)
- {
- observer->destroy();
- }
- _observer->setObserverUpdater(0); // Break cyclic reference count.
+ CommunicatorObserverIPtr observer = CommunicatorObserverIPtr::dynamicCast(_initData.observer);
+ if(observer)
+ {
+ observer->destroy(); // Break cyclic reference counts. Don't clear _observer, it's immutable.
}
+ _initData.observer->setObserverUpdater(0); // Break cyclic reference count.
}
Ice::LoggerAdminLoggerPtr logger = Ice::LoggerAdminLoggerPtr::dynamicCast(_initData.logger);
@@ -1619,7 +1688,7 @@ IceInternal::Instance::destroy()
ThreadPoolPtr serverThreadPool;
ThreadPoolPtr clientThreadPool;
EndpointHostResolverPtr endpointHostResolver;
- IceUtil::TimerPtr timer;
+ TimerPtr timer;
{
IceUtil::RecMutex::Lock sync(*this);
@@ -1766,6 +1835,10 @@ IceInternal::Instance::updateThreadObservers()
{
_endpointHostResolver->updateObserver();
}
+ if(_timer)
+ {
+ _timer->updateObserver(_initData.observer);
+ }
}
catch(const Ice::CommunicatorDestroyedException&)
{