From 65e56dc7d56154c4f8af05d885599af71fc35cca Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Mon, 23 Jul 2012 09:00:50 +0200 Subject: Initial metrics --- cpp/src/Ice/ConnectionI.cpp | 105 ++++++++++++++++++++- cpp/src/Ice/ConnectionI.h | 8 ++ cpp/src/Ice/Makefile | 10 +- cpp/src/Ice/MetricsAdminI.cpp | 198 +++++++++++++++++++++++++++++++++++++++ cpp/src/Ice/MetricsAdminI.h | 107 +++++++++++++++++++++ cpp/src/Ice/MetricsObserverI.cpp | 188 +++++++++++++++++++++++++++++++++++++ cpp/src/Ice/MetricsObserverI.h | 179 +++++++++++++++++++++++++++++++++++ 7 files changed, 792 insertions(+), 3 deletions(-) create mode 100644 cpp/src/Ice/MetricsAdminI.cpp create mode 100644 cpp/src/Ice/MetricsAdminI.h create mode 100644 cpp/src/Ice/MetricsObserverI.cpp create mode 100644 cpp/src/Ice/MetricsObserverI.h (limited to 'cpp/src') diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index e1a8e03dd82..7e619ba2e0f 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -25,6 +25,7 @@ #include #include // For createProxy(). #include // For createProxy(). +#include #include using namespace std; @@ -118,6 +119,16 @@ private: ConnectionIPtr _connection; }; +Ice::ConnectionState connectionStateMap[] = { + Ice::ConnectionStateInitializing, // StateNotInitialized + Ice::ConnectionStateInitializing, // StateNotValidated + Ice::ConnectionStateActive, // StateActive + Ice::ConnectionStateHolding, // StateHolding + Ice::ConnectionStateClosing, // StateClosing + Ice::ConnectionStateClosed, // StateClosed + Ice::ConnectionStateClosed, // StateFinished +}; + } void @@ -1145,6 +1156,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { + if(_observer) + { + assert(!_writeWatch.isStarted()); + _writeStreamPos = _writeStream.i; + _writeWatch.start(); + } + if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty()) { // The whole message is written, assume it's sent now for at-most-once semantics. @@ -1153,6 +1171,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation) } else if(operation & SocketOperationRead) { + if(_observer && !_readHeader) + { + assert(!_readWatch.isStarted()); + _readStreamPos = _readStream.i; + _readWatch.start(); + } + _transceiver->startRead(_readStream); } } @@ -1171,10 +1196,20 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { + if(_observer) + { + assert(_writeWatch.isStarted() && _writeStream.i >= _writeStreamPos); + _observer->sentBytes(static_cast(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + } _transceiver->finishWrite(_writeStream); } else if(operation & SocketOperationRead) { + if(_observer && !_readHeader) + { + assert(_readStream.i >= _readStreamPos); + _observer->receivedBytes(static_cast(_readStream.i - _readStreamPos), _readWatch.stop()); + } _transceiver->finishRead(_readStream); } } @@ -1219,12 +1254,29 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) unscheduleTimeout(current.operation); if(current.operation & SocketOperationWrite && !_writeStream.b.empty()) { + if(_observer && _writeStream.i != _writeStream.b.end()) + { + if(_writeWatch.isStarted()) + { + assert(_writeStream.i >= _writeStreamPos); + _observer->sentBytes(static_cast(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + } + _writeStreamPos = _writeStream.i; + _writeWatch.start(); + } + if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) { assert(!_writeStream.b.empty()); scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); return; } + + if(_observer) + { + assert(_writeStream.i >= _writeStreamPos); + _observer->sentBytes(static_cast(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + } assert(_writeStream.i == _writeStream.b.end()); } if(current.operation & SocketOperationRead && !_readStream.b.empty()) @@ -1237,6 +1289,17 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } assert(_readStream.i == _readStream.b.end()); _readHeader = false; + + if(_observer) + { + // + // We can't measure the time to receive the header as it would + // include the wait time. We start the timer now. + // + _observer->receivedBytes(static_cast(_readStream.i - _readStream.b.begin()), 0); + _readStreamPos = _readStream.i; + _readWatch.start(); + } ptrdiff_t pos = _readStream.i - _readStream.b.begin(); if(pos < headerSize) @@ -1284,6 +1347,17 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) _readStream.i = _readStream.b.begin() + pos; } + if(_observer && _readStream.i != _readStream.b.end()) + { + if(_readWatch.isStarted()) + { + assert(_readStream.i >= _readStreamPos); + _observer->receivedBytes(static_cast(_readStream.i - _readStreamPos), _readWatch.stop()); + } + _readStreamPos = _readStream.i; + _readWatch.start(); + } + if(_readStream.i != _readStream.b.end()) { if(_endpoint->datagram()) @@ -1301,6 +1375,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) assert(_readStream.i == _readStream.b.end()); } } + + if(_observer) + { + assert(_readStream.i >= _readStreamPos); + _observer->receivedBytes(static_cast(_readStream.i - _readStreamPos), _readWatch.stop()); + } } if(_state <= StateNotValidated) @@ -1778,7 +1858,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _state(StateNotInitialized), _shutdownInitiated(false) { - int& compressionLevel = const_cast(_compressionLevel); compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault( "Ice.Compression.Level", 1); @@ -1826,6 +1905,11 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, const_cast(_threadPool) = _instance->clientThreadPool(); } _threadPool->initialize(this); + + if(_instance->initializationData().observerResolver) + { + _observer = _instance->initializationData().observerResolver->getConnectionObserver(_observer, this); + } } catch(const IceUtil::Exception&) { @@ -1833,10 +1917,20 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, throw; } __setNoDelete(false); + + if(_observer) + { + _observer->attach(); + } } Ice::ConnectionI::~ConnectionI() { + if(_observer) + { + _observer->detach(); + } + assert(!_startCallback); assert(_state == StateFinished); assert(_dispatchCount == 0); @@ -2040,6 +2134,15 @@ Ice::ConnectionI::setState(State state) } } + if(_observer) + { + Ice::ConnectionState oldState = connectionStateMap[static_cast(_state)]; + Ice::ConnectionState newState = connectionStateMap[static_cast(state)]; + if(oldState != newState) + { + _observer->stateChanged(oldState, newState); + } + } _state = state; notifyAll(); diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 8648f8f302c..246cf72e9f2 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -30,6 +31,7 @@ #include #include #include +#include #include #include @@ -277,6 +279,7 @@ private: const std::string _type; const IceInternal::ConnectorPtr _connector; const IceInternal::EndpointIPtr _endpoint; + Ice::ConnectionObserverPtr _observer; ObjectAdapterPtr _adapter; IceInternal::ServantManagerPtr _servantManager; @@ -324,6 +327,11 @@ private: bool _readHeader; IceInternal::BasicStream _writeStream; + Ice::Byte* _writeStreamPos; + IceUtilInternal::StopWatch _writeWatch; + Ice::Byte* _readStreamPos; + IceUtilInternal::StopWatch _readWatch; + int _dispatchCount; State _state; // The current state. diff --git a/cpp/src/Ice/Makefile b/cpp/src/Ice/Makefile index 70da4e66b31..00b5893bf4b 100644 --- a/cpp/src/Ice/Makefile +++ b/cpp/src/Ice/Makefile @@ -60,6 +60,9 @@ OBJS = Acceptor.o \ LoggerI.o \ Logger.o \ LoggerUtil.o \ + Metrics.o \ + MetricsAdminI.o \ + MetricsObserverI.o \ Network.o \ ObjectAdapterFactory.o \ ObjectAdapterI.o \ @@ -67,6 +70,7 @@ OBJS = Acceptor.o \ ObjectFactoryManager.o \ ObjectFactory.o \ Object.o \ + Observer.o \ OpaqueEndpointI.o \ OutgoingAsync.o \ Outgoing.o \ @@ -132,10 +136,13 @@ SLICE_SRCS = $(SDIR)/BuiltinSequences.ice \ $(SDIR)/Locator.ice \ $(SDIR)/LoggerF.ice \ $(SDIR)/Logger.ice \ + $(SDIR)/Metrics.ice \ $(SDIR)/ObjectAdapterF.ice \ $(SDIR)/ObjectAdapter.ice \ $(SDIR)/ObjectFactoryF.ice \ $(SDIR)/ObjectFactory.ice \ + $(SDIR)/Observer.ice \ + $(SDIR)/ObserverF.ice \ $(SDIR)/PluginF.ice \ $(SDIR)/Plugin.ice \ $(SDIR)/ProcessF.ice \ @@ -149,8 +156,7 @@ SLICE_SRCS = $(SDIR)/BuiltinSequences.ice \ $(SDIR)/SliceChecksumDict.ice \ $(SDIR)/StatsF.ice \ $(SDIR)/Stats.ice \ - $(SDIR)/Version.ice - + $(SDIR)/Version.ice \ HDIR = $(headerdir)/Ice SDIR = $(slicedir)/Ice diff --git a/cpp/src/Ice/MetricsAdminI.cpp b/cpp/src/Ice/MetricsAdminI.cpp new file mode 100644 index 00000000000..c30e298bafb --- /dev/null +++ b/cpp/src/Ice/MetricsAdminI.cpp @@ -0,0 +1,198 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include + +using namespace std; +using namespace Ice; +using namespace IceMetrics; + + +MetricsMap::MetricsMap(const string& groupBy, const NameValueDict& accept, const NameValueDict& reject) : + _accept(accept), _reject(reject) +{ + // TODO: groupBy +} + +void +MetricsMap::destroy() +{ + _objects.clear(); +} + +MetricsObjectSeq +MetricsMap::getMetricsObjects() const +{ + MetricsObjectSeq objects; + for(map::const_iterator p = _objects.begin(); p != _objects.end(); ++p) + { + // TODO: Fix ice_clone! + objects.push_back(dynamic_cast(p->second->ice_clone().get())); + } + return objects; +} + +MetricsMap::Entry +MetricsMap::getMatching(const MetricsHelper& helper) +{ + for(map::const_iterator p = _accept.begin(); p != _accept.end(); ++p) + { + if(!match(helper(p->first), p->second)) + { + return MetricsMap::Entry(); + } + } + + for(map::const_iterator p = _reject.begin(); p != _reject.end(); ++p) + { + if(match(helper(p->first), p->second)) + { + return MetricsMap::Entry(); + } + } + + ostringstream os; + vector::const_iterator q = _groupBySeparators.begin(); + for(vector::const_iterator p = _groupByAttributes.begin(); p != _groupByAttributes.end(); ++p) + { + os << helper(*p) << *q; + } + + string key = os.str(); + map::const_iterator p = _objects.find(key); + if(p == _objects.end()) + { + Entry e; + e.object = helper.newMetricsObject(); + e.object->id = key; + e.map = this; + p = _objects.insert(make_pair(os.str(), e)).first; + } + return p->second; +} + +bool +MetricsMap::match(const string& value, const string& expr) const +{ + return true; // TODO +} + +MetricsView::MetricsView() +{ +} + +void +MetricsView::add(const string& cl, const string& groupBy, const NameValueDict& accept, const NameValueDict& reject) +{ + _maps.insert(make_pair(cl, new MetricsMap(groupBy, accept, reject))); +} + +void +MetricsView::remove(const string& cl) +{ + _maps.erase(cl); +} + +MetricsObjectSeqDict +MetricsView::getMetricsObjects() const +{ + MetricsObjectSeqDict metrics; + for(map::const_iterator p = _maps.begin(); p != _maps.end(); ++p) + { + metrics.insert(make_pair(p->first, p->second->getMetricsObjects())); + } + return metrics; +} + +MetricsMap::Entry +MetricsView::getMatching(const string& cl, const MetricsHelper& helper) const +{ + map::const_iterator p = _maps.find(cl); + if(p != _maps.end()) + { + return p->second->getMatching(helper); + } + return MetricsMap::Entry() +} + +void +MetricsAdminI::addUpdater(const string& cl, const ObjectObserverUpdaterPtr& updater) +{ + _updaters.insert(make_pair(cl, updater)); +} + +vector +MetricsAdminI::getMatching(const string& cl, const MetricsHelper& helper) const +{ + Lock sync(*this); + vector objects; + for(map::const_iterator p = _views.begin(); p != _views.end(); ++p) + { + MetricsMap::Entry e = p->second->getMatching(cl, helper); + if(e.object) + { + objects.push_back(e); + } + } + return objects; +} + +MetricsObjectSeqDict +MetricsAdminI::getMetrics(const string& view, const Ice::Current&) +{ + Lock sync(*this); + std::map::const_iterator p = _views.find(view); + if(p == _views.end()) + { + throw UnknownMetricsView(); + } + return p->second->getMetricsObjects(); +} + +MetricsObjectSeqDictDict +MetricsAdminI::getAllMetrics(const Ice::Current&) +{ + Lock sync(*this); + MetricsObjectSeqDictDict metrics; + for(map::const_iterator p = _views.begin(); p != _views.end(); ++p) + { + metrics.insert(make_pair(p->first, p->second->getMetricsObjects())); + } + return metrics; +} + +void +MetricsAdminI::addClassToView(const string& view, + const string& cl, + const string& groupBy, + const NameValueDict& accept, + const NameValueDict& reject, + const Ice::Current&) +{ + { + Lock sync(*this); + map::const_iterator p = _views.find(view); + if(p == _views.end()) + { + p = _views.insert(make_pair(view, new MetricsView())).first; + } + p->second->add(cl, groupBy, accept, reject); + } + _updaters[cl]->update(); +} + +void +MetricsAdminI::removeClassFromView(const string& view, const string& cl, const Ice::Current&) +{ + { + Lock sync(*this); + _views.erase(cl); + } + _updaters[cl]->update(); +} diff --git a/cpp/src/Ice/MetricsAdminI.h b/cpp/src/Ice/MetricsAdminI.h new file mode 100644 index 00000000000..a021ae77845 --- /dev/null +++ b/cpp/src/Ice/MetricsAdminI.h @@ -0,0 +1,107 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_METRICSADMIN_I_H +#define ICE_METRICSADMIN_I_H + +#include +#include + +#include + +namespace IceMetrics +{ + +class MetricsHelper +{ +public: + + virtual std::string operator()(const std::string&) const = 0; + + virtual MetricsObjectPtr newMetricsObject() const = 0; +}; + +class MetricsMap : public IceUtil::Shared, IceUtil::Mutex +{ +public: + + struct Entry + { + MetricsObjectPtr object; + MetricsMapPtr map; + + bool operator<(const Entry& e) + { + return object < e.object; + } + }; + + MetricsMap(const std::string&, const NameValueDict&, const NameValueDict&); + + void destroy(); + + MetricsObjectSeq getMetricsObjects() const; + + MetricsObjectPtr getMatching(const MetricsHelper&); + +private: + + bool match(const std::string&, const std::string&) const; + + const std::vector _groupByAttributes; + const std::vector _groupBySeparators; + const NameValueDict _accept; + const NameValueDict _reject; + std::map _objects; +}; +typedef IceUtil::Handle MetricsMapPtr; + +class MetricsView : public IceUtil::Shared +{ +public: + + MetricsView(); + + void add(const std::string&, const std::string&, const NameValueDict&, const NameValueDict&); + void remove(const std::string&); + + MetricsObjectSeqDict getMetricsObjects() const; + + MetricsObjectPtr getMatching(const std::string&, const MetricsHelper&) const; + +private: + + std::map _maps; +}; +typedef IceUtil::Handle MetricsViewPtr; + +class MetricsAdminI : public MetricsAdmin, public IceUtil::Mutex +{ +public: + + MetricsObjectSeq getMatching(const std::string&, const MetricsHelper&) const; + void addUpdater(const std::string&, const ObjectObserverUpdaterPtr&); + + virtual MetricsObjectSeqDict getMetrics(const std::string&, const Ice::Current&); + virtual MetricsObjectSeqDictDict getAllMetrics(const Ice::Current&); + + virtual void addClassToView(const std::string&, const std::string&, const std::string&, const NameValueDict&, + const NameValueDict&, const Ice::Current&); + + virtual void removeClassFromView(const std::string&, const std::string&, const Ice::Current&); + +private: + + std::map _views; + std::map _updaters; +}; + +}; + +#endif diff --git a/cpp/src/Ice/MetricsObserverI.cpp b/cpp/src/Ice/MetricsObserverI.cpp new file mode 100644 index 00000000000..c3e51b170ba --- /dev/null +++ b/cpp/src/Ice/MetricsObserverI.cpp @@ -0,0 +1,188 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include +#include + +using namespace std; +using namespace Ice; +using namespace IceMetrics; + +namespace +{ + +class ConnectionMetricsHelper : public MetricsHelper +{ +public: + + ConnectionMetricsHelper(const ConnectionPtr& con) : _connection(con) + { + } + + virtual string operator()(const string& attribute) const + { + return ""; // TODO: return attribute value + } + + virtual MetricsObjectPtr newMetricsObject() const + { + return new ConnectionMetricsObject(); + } + +private: + + Ice::ConnectionPtr _connection; +}; + +} + +void +ConnectionObserverI::attach() +{ + struct Attach + { + void operator()(const ConnectionMetricsObjectPtr& v) + { + ++v->total; + ++v->current; + ++v->initializing; + } + }; + forEach(Attach()); +} + +void +ConnectionObserverI::detach() +{ + struct Detach + { + void operator()(const ConnectionMetricsObjectPtr& v) + { + ++v->total; + ++v->current; + ++v->initializing; + } + }; + forEach(Detach()); +} + +void +ConnectionObserverI::stateChanged(ConnectionState oldState, ConnectionState newState) +{ + struct StateChanged + { + StateChanged(ConnectionState oldState, ConnectionState newState) : + oldState(oldState), newState(newState) + { + } + + void operator()(const ConnectionMetricsObjectPtr& v) + { + --(v.get()->*getConnectionStateMetric(oldState)); + ++(v.get()->*getConnectionStateMetric(newState)); + } + + int ConnectionMetricsObject::* + getConnectionStateMetric(ConnectionState s) + { + switch(s) + { + case ConnectionStateInitializing: + return &ConnectionMetricsObject::initializing; + case ConnectionStateActive: + return &ConnectionMetricsObject::active; + case ConnectionStateHolding: + return &ConnectionMetricsObject::holding; + case ConnectionStateClosing: + return &ConnectionMetricsObject::closing; + case ConnectionStateClosed: + return &ConnectionMetricsObject::closed; + } + } + + ConnectionState oldState; + ConnectionState newState; + } + forEach(StateChanged(oldState, newState)); +} + +void +ConnectionObserverI::sentBytes(Int num, Long duration) +{ + forEach(aggregate(applyOnMember(&ConnectionMetricsObject::sentBytes, Add(num)), + applyOnMember(&ConnectionMetricsObject::sentTime, Add(duration)))); +} + +void +ConnectionObserverI::receivedBytes(Int num, Long duration) +{ + forEach(aggregate(applyOnMember(&ConnectionMetricsObject::receivedBytes, Add(num)), + applyOnMember(&ConnectionMetricsObject::receivedTime, Add(duration)))); +} + +ObserverResolverI::ObserverResolverI(const MetricsAdminIPtr& metrics) : _metrics(metrics) +{ +} + +void +ObserverResolverI::setObserverUpdater(const ObserverUpdaterPtr& updater) +{ + class Updater : public ObjectObserverUpdater + { + public: + + Updater(const ObserverUpdaterPtr& updater, void (ObserverUpdater::*fn)()) : + _updater(updater), _fn(fn) + { + } + + virtual void update() + { + (_updater.get()->*_fn)(); + } + + private: + + const ObserverUpdaterPtr _updater; + void (ObserverUpdater::*_fn)(); + }; + _metrics->addUpdater("Connection", new Updater(updater, &ObserverUpdater::updateConnectionObservers)); + _metrics->addUpdater("Thread", new Updater(updater, &ObserverUpdater::updateThreadObservers)); + _metrics->addUpdater("ThreadPoolThread", new Updater(updater, &ObserverUpdater::updateThreadPoolThreadObservers)); +} + +ConnectionObserverPtr +ObserverResolverI::getConnectionObserver(const ConnectionObserverPtr& old, const ConnectionPtr& con) +{ + return _connections.getObserver(_metrics->getMatching("Connection", ConnectionMetricsHelper(con)), old.get()); +} + +ObjectObserverPtr +ObserverResolverI::getThreadObserver(const ObjectObserverPtr&, const string&, const string&) +{ + return 0; +} + +ThreadPoolThreadObserverPtr +ObserverResolverI::getThreadPoolThreadObserver(const ThreadPoolThreadObserverPtr&, const string&, const string&) +{ + return 0; +} + +RequestObserverPtr +ObserverResolverI::getInvocationObserver(const RequestObserverPtr&, const ObjectPrx&, const string&) +{ + return 0; +} + +RequestObserverPtr +ObserverResolverI::getDispatchObserver(const RequestObserverPtr&, const ObjectPtr&, const Current&) +{ + return 0; +} diff --git a/cpp/src/Ice/MetricsObserverI.h b/cpp/src/Ice/MetricsObserverI.h new file mode 100644 index 00000000000..7b5b4abbf5a --- /dev/null +++ b/cpp/src/Ice/MetricsObserverI.h @@ -0,0 +1,179 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_METRICSSTATS_I_H +#define ICE_METRICSSTATS_I_H + +#include +#include + +#include + +namespace IceMetrics +{ + +class MetricsAdminI; +typedef IceUtil::Handle MetricsAdminIPtr; + +template class ObjectObserverI : virtual public Ice::ObjectObserver +{ +public: + + typedef T Type; + typedef IceUtil::Handle PtrType; + typedef std::vector SeqType; + + ObjectObserverI(const std::vector& objects) + { + for(std::vector::const_iterator p = objects.begin(); p != objects.end(); ++p) + { + _objects.push_back(PtrType::dynamicCast(*p)); + } + } + + virtual void + attach() + { + for(typename SeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) + { + ++(*p)->total; + ++(*p)->current; + } + } + + virtual void + detach() + { + forEach(decMember(&MetricsObject::current)); + } + + template void + forEach(Function func) + { + for(typename SeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) + { + func(*p); + } + } + + void + update(ObjectObserverI* old) + { + typename SeqType::const_iterator q = old->_objects.begin(); + for(typename SeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) + { + if(*p != *q) + { + ++(*p)->total; + ++(*p)->current; + } + else + { + ++q; + } + } + } + +private: + + SeqType _objects; +}; + +class ConnectionObserverI : public Ice::ConnectionObserver, public ObjectObserverI +{ +public: + + ConnectionObserverI(const std::vector& objects) : ObjectObserverI(objects) + { + } + + virtual void attach(); + virtual void detach(); + + virtual void stateChanged(Ice::ConnectionState, Ice::ConnectionState); + virtual void sentBytes(Ice::Int, Ice::Long); + virtual void receivedBytes(Ice::Int, Ice::Long); +}; + +template +class ObjectObserverResolver +{ +public: + + typedef IceUtil::Handle TPtr; + typedef std::map, TPtr> ObserverMap; + + template T* + getObserver(const std::vector& objects, S* oldObserver) + { + if(objects.empty()) + { + return 0; + } + + typename ObserverMap::const_iterator p = _stats.find(objects); + if(p == _stats.end()) + { + p = _stats.insert(make_pair(objects, new T(objects))).first; + } + + T* newObserver = p->second.get(); + if(oldObserver && static_cast(newObserver) != oldObserver) + { + newObserver->update(dynamic_cast(oldObserver)); + } + return newObserver; + } + +private: + + ObserverMap _stats; +}; + +class ObjectObserverUpdater : public IceUtil::Shared +{ +public: + + virtual void update() = 0; +}; +typedef IceUtil::Handle ObjectObserverUpdaterPtr; + +class ObserverResolverI : public Ice::ObserverResolver +{ +public: + + ObserverResolverI(const MetricsAdminIPtr&); + + virtual void setObserverUpdater(const Ice::ObserverUpdaterPtr&); + + virtual Ice::ConnectionObserverPtr + getConnectionObserver(const Ice::ConnectionObserverPtr&, const Ice::ConnectionPtr&); + + virtual Ice::ObjectObserverPtr + getThreadObserver(const Ice::ObjectObserverPtr&, const std::string&, const std::string&); + + virtual Ice::ThreadPoolThreadObserverPtr + getThreadPoolThreadObserver(const Ice::ThreadPoolThreadObserverPtr&, const std::string&, const std::string&); + + virtual Ice::RequestObserverPtr + getInvocationObserver(const Ice::RequestObserverPtr&, const Ice::ObjectPrx&, const std::string&); + + virtual Ice::RequestObserverPtr + getDispatchObserver(const Ice::RequestObserverPtr&, const Ice::ObjectPtr&, const Ice::Current&); + +private: + + const MetricsAdminIPtr _metrics; + + ObjectObserverResolver _connections; +}; + +} + +#endif -- cgit v1.2.3