diff options
-rw-r--r-- | cpp/include/Ice/Initialize.h | 2 | ||||
-rw-r--r-- | cpp/include/IceUtil/StopWatch.h | 49 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 105 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 8 | ||||
-rw-r--r-- | cpp/src/Ice/Makefile | 10 | ||||
-rw-r--r-- | cpp/src/Ice/MetricsAdminI.cpp | 198 | ||||
-rw-r--r-- | cpp/src/Ice/MetricsAdminI.h | 107 | ||||
-rw-r--r-- | cpp/src/Ice/MetricsObserverI.cpp | 188 | ||||
-rw-r--r-- | cpp/src/Ice/MetricsObserverI.h | 179 | ||||
-rw-r--r-- | slice/Ice/Metrics.ice | 112 | ||||
-rw-r--r-- | slice/Ice/Observer.ice | 84 | ||||
-rw-r--r-- | slice/Ice/ObserverF.ice | 25 | ||||
-rw-r--r-- | slice/Ice/Stats.ice | 2 |
13 files changed, 1065 insertions, 4 deletions
diff --git a/cpp/include/Ice/Initialize.h b/cpp/include/Ice/Initialize.h index 57a0fc55a56..58b8a72adce 100644 --- a/cpp/include/Ice/Initialize.h +++ b/cpp/include/Ice/Initialize.h @@ -16,6 +16,7 @@ #include <Ice/LoggerF.h> #include <Ice/StreamF.h> #include <Ice/StatsF.h> +#include <Ice/ObserverF.h> #include <Ice/Dispatcher.h> #include <Ice/StringConverter.h> #include <Ice/BuiltinSequences.h> @@ -84,6 +85,7 @@ struct InitializationData PropertiesPtr properties; LoggerPtr logger; StatsPtr stats; + ObserverResolverPtr observerResolver; StringConverterPtr stringConverter; WstringConverterPtr wstringConverter; ThreadNotificationPtr threadHook; diff --git a/cpp/include/IceUtil/StopWatch.h b/cpp/include/IceUtil/StopWatch.h new file mode 100644 index 00000000000..1ffabf199a1 --- /dev/null +++ b/cpp/include/IceUtil/StopWatch.h @@ -0,0 +1,49 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_UTIL_STOPWATCH_H +#define ICE_UTIL_STOPWATCH_H + +#include <IceUtil/Time.h> + +namespace IceUtilInternal +{ + +class ICE_UTIL_API StopWatch +{ +public: + + StopWatch() { } + + void start() + { + _s = IceUtil::Time::now(IceUtil::Time::Monotonic); + } + + long stop() + { + assert(isStarted()); + long d = (IceUtil::Time::now(IceUtil::Time::Monotonic) - _s).toMicroSeconds(); + _s = IceUtil::Time(); + return d; + } + + bool isStarted() const + { + return _s != IceUtil::Time(); + } + +private: + + IceUtil::Time _s; +}; + +} // End namespace IceUtilInternal + +#endif diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index e1a8e03dd82..7e619ba2e0f 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -25,6 +25,7 @@ #include <Ice/LocalException.h> #include <Ice/ReferenceFactory.h> // For createProxy(). #include <Ice/ProxyFactory.h> // For createProxy(). +#include <Ice/Observer.h> #include <bzlib.h> using namespace std; @@ -118,6 +119,16 @@ private: ConnectionIPtr _connection; }; +Ice::ConnectionState connectionStateMap[] = { + Ice::ConnectionStateInitializing, // StateNotInitialized + Ice::ConnectionStateInitializing, // StateNotValidated + Ice::ConnectionStateActive, // StateActive + Ice::ConnectionStateHolding, // StateHolding + Ice::ConnectionStateClosing, // StateClosing + Ice::ConnectionStateClosed, // StateClosed + Ice::ConnectionStateClosed, // StateFinished +}; + } void @@ -1145,6 +1156,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { + if(_observer) + { + assert(!_writeWatch.isStarted()); + _writeStreamPos = _writeStream.i; + _writeWatch.start(); + } + if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty()) { // The whole message is written, assume it's sent now for at-most-once semantics. @@ -1153,6 +1171,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation) } else if(operation & SocketOperationRead) { + if(_observer && !_readHeader) + { + assert(!_readWatch.isStarted()); + _readStreamPos = _readStream.i; + _readWatch.start(); + } + _transceiver->startRead(_readStream); } } @@ -1171,10 +1196,20 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { + if(_observer) + { + assert(_writeWatch.isStarted() && _writeStream.i >= _writeStreamPos); + _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + } _transceiver->finishWrite(_writeStream); } else if(operation & SocketOperationRead) { + if(_observer && !_readHeader) + { + assert(_readStream.i >= _readStreamPos); + _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); + } _transceiver->finishRead(_readStream); } } @@ -1219,12 +1254,29 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) unscheduleTimeout(current.operation); if(current.operation & SocketOperationWrite && !_writeStream.b.empty()) { + if(_observer && _writeStream.i != _writeStream.b.end()) + { + if(_writeWatch.isStarted()) + { + assert(_writeStream.i >= _writeStreamPos); + _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + } + _writeStreamPos = _writeStream.i; + _writeWatch.start(); + } + if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) { assert(!_writeStream.b.empty()); scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); return; } + + if(_observer) + { + assert(_writeStream.i >= _writeStreamPos); + _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + } assert(_writeStream.i == _writeStream.b.end()); } if(current.operation & SocketOperationRead && !_readStream.b.empty()) @@ -1237,6 +1289,17 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } assert(_readStream.i == _readStream.b.end()); _readHeader = false; + + if(_observer) + { + // + // We can't measure the time to receive the header as it would + // include the wait time. We start the timer now. + // + _observer->receivedBytes(static_cast<int>(_readStream.i - _readStream.b.begin()), 0); + _readStreamPos = _readStream.i; + _readWatch.start(); + } ptrdiff_t pos = _readStream.i - _readStream.b.begin(); if(pos < headerSize) @@ -1284,6 +1347,17 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) _readStream.i = _readStream.b.begin() + pos; } + if(_observer && _readStream.i != _readStream.b.end()) + { + if(_readWatch.isStarted()) + { + assert(_readStream.i >= _readStreamPos); + _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); + } + _readStreamPos = _readStream.i; + _readWatch.start(); + } + if(_readStream.i != _readStream.b.end()) { if(_endpoint->datagram()) @@ -1301,6 +1375,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) assert(_readStream.i == _readStream.b.end()); } } + + if(_observer) + { + assert(_readStream.i >= _readStreamPos); + _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); + } } if(_state <= StateNotValidated) @@ -1778,7 +1858,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _state(StateNotInitialized), _shutdownInitiated(false) { - int& compressionLevel = const_cast<int&>(_compressionLevel); compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault( "Ice.Compression.Level", 1); @@ -1826,6 +1905,11 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool(); } _threadPool->initialize(this); + + if(_instance->initializationData().observerResolver) + { + _observer = _instance->initializationData().observerResolver->getConnectionObserver(_observer, this); + } } catch(const IceUtil::Exception&) { @@ -1833,10 +1917,20 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, throw; } __setNoDelete(false); + + if(_observer) + { + _observer->attach(); + } } Ice::ConnectionI::~ConnectionI() { + if(_observer) + { + _observer->detach(); + } + assert(!_startCallback); assert(_state == StateFinished); assert(_dispatchCount == 0); @@ -2040,6 +2134,15 @@ Ice::ConnectionI::setState(State state) } } + if(_observer) + { + Ice::ConnectionState oldState = connectionStateMap[static_cast<int>(_state)]; + Ice::ConnectionState newState = connectionStateMap[static_cast<int>(state)]; + if(oldState != newState) + { + _observer->stateChanged(oldState, newState); + } + } _state = state; notifyAll(); diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 8648f8f302c..246cf72e9f2 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -13,6 +13,7 @@ #include <IceUtil/Mutex.h> #include <IceUtil/Monitor.h> #include <IceUtil/Time.h> +#include <IceUtil/StopWatch.h> #include <IceUtil/Timer.h> #include <Ice/CommunicatorF.h> @@ -30,6 +31,7 @@ #include <Ice/OutgoingAsyncF.h> #include <Ice/EventHandler.h> #include <Ice/Dispatcher.h> +#include <Ice/ObserverF.h> #include <deque> #include <memory> @@ -277,6 +279,7 @@ private: const std::string _type; const IceInternal::ConnectorPtr _connector; const IceInternal::EndpointIPtr _endpoint; + Ice::ConnectionObserverPtr _observer; ObjectAdapterPtr _adapter; IceInternal::ServantManagerPtr _servantManager; @@ -324,6 +327,11 @@ private: bool _readHeader; IceInternal::BasicStream _writeStream; + Ice::Byte* _writeStreamPos; + IceUtilInternal::StopWatch _writeWatch; + Ice::Byte* _readStreamPos; + IceUtilInternal::StopWatch _readWatch; + int _dispatchCount; State _state; // The current state. diff --git a/cpp/src/Ice/Makefile b/cpp/src/Ice/Makefile index 70da4e66b31..00b5893bf4b 100644 --- a/cpp/src/Ice/Makefile +++ b/cpp/src/Ice/Makefile @@ -60,6 +60,9 @@ OBJS = Acceptor.o \ LoggerI.o \ Logger.o \ LoggerUtil.o \ + Metrics.o \ + MetricsAdminI.o \ + MetricsObserverI.o \ Network.o \ ObjectAdapterFactory.o \ ObjectAdapterI.o \ @@ -67,6 +70,7 @@ OBJS = Acceptor.o \ ObjectFactoryManager.o \ ObjectFactory.o \ Object.o \ + Observer.o \ OpaqueEndpointI.o \ OutgoingAsync.o \ Outgoing.o \ @@ -132,10 +136,13 @@ SLICE_SRCS = $(SDIR)/BuiltinSequences.ice \ $(SDIR)/Locator.ice \ $(SDIR)/LoggerF.ice \ $(SDIR)/Logger.ice \ + $(SDIR)/Metrics.ice \ $(SDIR)/ObjectAdapterF.ice \ $(SDIR)/ObjectAdapter.ice \ $(SDIR)/ObjectFactoryF.ice \ $(SDIR)/ObjectFactory.ice \ + $(SDIR)/Observer.ice \ + $(SDIR)/ObserverF.ice \ $(SDIR)/PluginF.ice \ $(SDIR)/Plugin.ice \ $(SDIR)/ProcessF.ice \ @@ -149,8 +156,7 @@ SLICE_SRCS = $(SDIR)/BuiltinSequences.ice \ $(SDIR)/SliceChecksumDict.ice \ $(SDIR)/StatsF.ice \ $(SDIR)/Stats.ice \ - $(SDIR)/Version.ice - + $(SDIR)/Version.ice \ HDIR = $(headerdir)/Ice SDIR = $(slicedir)/Ice diff --git a/cpp/src/Ice/MetricsAdminI.cpp b/cpp/src/Ice/MetricsAdminI.cpp new file mode 100644 index 00000000000..c30e298bafb --- /dev/null +++ b/cpp/src/Ice/MetricsAdminI.cpp @@ -0,0 +1,198 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/MetricsAdminI.h> + +using namespace std; +using namespace Ice; +using namespace IceMetrics; + + +MetricsMap::MetricsMap(const string& groupBy, const NameValueDict& accept, const NameValueDict& reject) : + _accept(accept), _reject(reject) +{ + // TODO: groupBy +} + +void +MetricsMap::destroy() +{ + _objects.clear(); +} + +MetricsObjectSeq +MetricsMap::getMetricsObjects() const +{ + MetricsObjectSeq objects; + for(map<string, MetricsObjectPtr>::const_iterator p = _objects.begin(); p != _objects.end(); ++p) + { + // TODO: Fix ice_clone! + objects.push_back(dynamic_cast<MetricsObject*>(p->second->ice_clone().get())); + } + return objects; +} + +MetricsMap::Entry +MetricsMap::getMatching(const MetricsHelper& helper) +{ + for(map<string, string>::const_iterator p = _accept.begin(); p != _accept.end(); ++p) + { + if(!match(helper(p->first), p->second)) + { + return MetricsMap::Entry(); + } + } + + for(map<string, string>::const_iterator p = _reject.begin(); p != _reject.end(); ++p) + { + if(match(helper(p->first), p->second)) + { + return MetricsMap::Entry(); + } + } + + ostringstream os; + vector<string>::const_iterator q = _groupBySeparators.begin(); + for(vector<string>::const_iterator p = _groupByAttributes.begin(); p != _groupByAttributes.end(); ++p) + { + os << helper(*p) << *q; + } + + string key = os.str(); + map<string, Entry>::const_iterator p = _objects.find(key); + if(p == _objects.end()) + { + Entry e; + e.object = helper.newMetricsObject(); + e.object->id = key; + e.map = this; + p = _objects.insert(make_pair(os.str(), e)).first; + } + return p->second; +} + +bool +MetricsMap::match(const string& value, const string& expr) const +{ + return true; // TODO +} + +MetricsView::MetricsView() +{ +} + +void +MetricsView::add(const string& cl, const string& groupBy, const NameValueDict& accept, const NameValueDict& reject) +{ + _maps.insert(make_pair(cl, new MetricsMap(groupBy, accept, reject))); +} + +void +MetricsView::remove(const string& cl) +{ + _maps.erase(cl); +} + +MetricsObjectSeqDict +MetricsView::getMetricsObjects() const +{ + MetricsObjectSeqDict metrics; + for(map<string, MetricsMapPtr>::const_iterator p = _maps.begin(); p != _maps.end(); ++p) + { + metrics.insert(make_pair(p->first, p->second->getMetricsObjects())); + } + return metrics; +} + +MetricsMap::Entry +MetricsView::getMatching(const string& cl, const MetricsHelper& helper) const +{ + map<string, MetricsMapPtr>::const_iterator p = _maps.find(cl); + if(p != _maps.end()) + { + return p->second->getMatching(helper); + } + return MetricsMap::Entry() +} + +void +MetricsAdminI::addUpdater(const string& cl, const ObjectObserverUpdaterPtr& updater) +{ + _updaters.insert(make_pair(cl, updater)); +} + +vector<MetricsMap::Entry> +MetricsAdminI::getMatching(const string& cl, const MetricsHelper& helper) const +{ + Lock sync(*this); + vector<MetricsMap::Entry> objects; + for(map<string, MetricsViewPtr>::const_iterator p = _views.begin(); p != _views.end(); ++p) + { + MetricsMap::Entry e = p->second->getMatching(cl, helper); + if(e.object) + { + objects.push_back(e); + } + } + return objects; +} + +MetricsObjectSeqDict +MetricsAdminI::getMetrics(const string& view, const Ice::Current&) +{ + Lock sync(*this); + std::map<string, MetricsViewPtr>::const_iterator p = _views.find(view); + if(p == _views.end()) + { + throw UnknownMetricsView(); + } + return p->second->getMetricsObjects(); +} + +MetricsObjectSeqDictDict +MetricsAdminI::getAllMetrics(const Ice::Current&) +{ + Lock sync(*this); + MetricsObjectSeqDictDict metrics; + for(map<string, MetricsViewPtr>::const_iterator p = _views.begin(); p != _views.end(); ++p) + { + metrics.insert(make_pair(p->first, p->second->getMetricsObjects())); + } + return metrics; +} + +void +MetricsAdminI::addClassToView(const string& view, + const string& cl, + const string& groupBy, + const NameValueDict& accept, + const NameValueDict& reject, + const Ice::Current&) +{ + { + Lock sync(*this); + map<string, MetricsViewPtr>::const_iterator p = _views.find(view); + if(p == _views.end()) + { + p = _views.insert(make_pair(view, new MetricsView())).first; + } + p->second->add(cl, groupBy, accept, reject); + } + _updaters[cl]->update(); +} + +void +MetricsAdminI::removeClassFromView(const string& view, const string& cl, const Ice::Current&) +{ + { + Lock sync(*this); + _views.erase(cl); + } + _updaters[cl]->update(); +} diff --git a/cpp/src/Ice/MetricsAdminI.h b/cpp/src/Ice/MetricsAdminI.h new file mode 100644 index 00000000000..a021ae77845 --- /dev/null +++ b/cpp/src/Ice/MetricsAdminI.h @@ -0,0 +1,107 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_METRICSADMIN_I_H +#define ICE_METRICSADMIN_I_H + +#include <Ice/Metrics.h> +#include <Ice/Stats.h> + +#include <Ice/MetricsObserverI.h> + +namespace IceMetrics +{ + +class MetricsHelper +{ +public: + + virtual std::string operator()(const std::string&) const = 0; + + virtual MetricsObjectPtr newMetricsObject() const = 0; +}; + +class MetricsMap : public IceUtil::Shared, IceUtil::Mutex +{ +public: + + struct Entry + { + MetricsObjectPtr object; + MetricsMapPtr map; + + bool operator<(const Entry& e) + { + return object < e.object; + } + }; + + MetricsMap(const std::string&, const NameValueDict&, const NameValueDict&); + + void destroy(); + + MetricsObjectSeq getMetricsObjects() const; + + MetricsObjectPtr getMatching(const MetricsHelper&); + +private: + + bool match(const std::string&, const std::string&) const; + + const std::vector<std::string> _groupByAttributes; + const std::vector<std::string> _groupBySeparators; + const NameValueDict _accept; + const NameValueDict _reject; + std::map<std::string, MetricsObjectPtr> _objects; +}; +typedef IceUtil::Handle<MetricsMap> MetricsMapPtr; + +class MetricsView : public IceUtil::Shared +{ +public: + + MetricsView(); + + void add(const std::string&, const std::string&, const NameValueDict&, const NameValueDict&); + void remove(const std::string&); + + MetricsObjectSeqDict getMetricsObjects() const; + + MetricsObjectPtr getMatching(const std::string&, const MetricsHelper&) const; + +private: + + std::map<std::string, MetricsMapPtr> _maps; +}; +typedef IceUtil::Handle<MetricsView> MetricsViewPtr; + +class MetricsAdminI : public MetricsAdmin, public IceUtil::Mutex +{ +public: + + MetricsObjectSeq getMatching(const std::string&, const MetricsHelper&) const; + void addUpdater(const std::string&, const ObjectObserverUpdaterPtr&); + + virtual MetricsObjectSeqDict getMetrics(const std::string&, const Ice::Current&); + virtual MetricsObjectSeqDictDict getAllMetrics(const Ice::Current&); + + virtual void addClassToView(const std::string&, const std::string&, const std::string&, const NameValueDict&, + const NameValueDict&, const Ice::Current&); + + virtual void removeClassFromView(const std::string&, const std::string&, const Ice::Current&); + +private: + + std::map<std::string, MetricsViewPtr> _views; + std::map<std::string, ObjectObserverUpdaterPtr> _updaters; +}; + +}; + +#endif diff --git a/cpp/src/Ice/MetricsObserverI.cpp b/cpp/src/Ice/MetricsObserverI.cpp new file mode 100644 index 00000000000..c3e51b170ba --- /dev/null +++ b/cpp/src/Ice/MetricsObserverI.cpp @@ -0,0 +1,188 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/MetricsObserverI.h> +#include <Ice/MetricsAdminI.h> + +using namespace std; +using namespace Ice; +using namespace IceMetrics; + +namespace +{ + +class ConnectionMetricsHelper : public MetricsHelper +{ +public: + + ConnectionMetricsHelper(const ConnectionPtr& con) : _connection(con) + { + } + + virtual string operator()(const string& attribute) const + { + return ""; // TODO: return attribute value + } + + virtual MetricsObjectPtr newMetricsObject() const + { + return new ConnectionMetricsObject(); + } + +private: + + Ice::ConnectionPtr _connection; +}; + +} + +void +ConnectionObserverI::attach() +{ + struct Attach + { + void operator()(const ConnectionMetricsObjectPtr& v) + { + ++v->total; + ++v->current; + ++v->initializing; + } + }; + forEach(Attach()); +} + +void +ConnectionObserverI::detach() +{ + struct Detach + { + void operator()(const ConnectionMetricsObjectPtr& v) + { + ++v->total; + ++v->current; + ++v->initializing; + } + }; + forEach(Detach()); +} + +void +ConnectionObserverI::stateChanged(ConnectionState oldState, ConnectionState newState) +{ + struct StateChanged + { + StateChanged(ConnectionState oldState, ConnectionState newState) : + oldState(oldState), newState(newState) + { + } + + void operator()(const ConnectionMetricsObjectPtr& v) + { + --(v.get()->*getConnectionStateMetric(oldState)); + ++(v.get()->*getConnectionStateMetric(newState)); + } + + int ConnectionMetricsObject::* + getConnectionStateMetric(ConnectionState s) + { + switch(s) + { + case ConnectionStateInitializing: + return &ConnectionMetricsObject::initializing; + case ConnectionStateActive: + return &ConnectionMetricsObject::active; + case ConnectionStateHolding: + return &ConnectionMetricsObject::holding; + case ConnectionStateClosing: + return &ConnectionMetricsObject::closing; + case ConnectionStateClosed: + return &ConnectionMetricsObject::closed; + } + } + + ConnectionState oldState; + ConnectionState newState; + } + forEach(StateChanged(oldState, newState)); +} + +void +ConnectionObserverI::sentBytes(Int num, Long duration) +{ + forEach(aggregate(applyOnMember(&ConnectionMetricsObject::sentBytes, Add<Int>(num)), + applyOnMember(&ConnectionMetricsObject::sentTime, Add<Long>(duration)))); +} + +void +ConnectionObserverI::receivedBytes(Int num, Long duration) +{ + forEach(aggregate(applyOnMember(&ConnectionMetricsObject::receivedBytes, Add<Int>(num)), + applyOnMember(&ConnectionMetricsObject::receivedTime, Add<Long>(duration)))); +} + +ObserverResolverI::ObserverResolverI(const MetricsAdminIPtr& metrics) : _metrics(metrics) +{ +} + +void +ObserverResolverI::setObserverUpdater(const ObserverUpdaterPtr& updater) +{ + class Updater : public ObjectObserverUpdater + { + public: + + Updater(const ObserverUpdaterPtr& updater, void (ObserverUpdater::*fn)()) : + _updater(updater), _fn(fn) + { + } + + virtual void update() + { + (_updater.get()->*_fn)(); + } + + private: + + const ObserverUpdaterPtr _updater; + void (ObserverUpdater::*_fn)(); + }; + _metrics->addUpdater("Connection", new Updater(updater, &ObserverUpdater::updateConnectionObservers)); + _metrics->addUpdater("Thread", new Updater(updater, &ObserverUpdater::updateThreadObservers)); + _metrics->addUpdater("ThreadPoolThread", new Updater(updater, &ObserverUpdater::updateThreadPoolThreadObservers)); +} + +ConnectionObserverPtr +ObserverResolverI::getConnectionObserver(const ConnectionObserverPtr& old, const ConnectionPtr& con) +{ + return _connections.getObserver(_metrics->getMatching("Connection", ConnectionMetricsHelper(con)), old.get()); +} + +ObjectObserverPtr +ObserverResolverI::getThreadObserver(const ObjectObserverPtr&, const string&, const string&) +{ + return 0; +} + +ThreadPoolThreadObserverPtr +ObserverResolverI::getThreadPoolThreadObserver(const ThreadPoolThreadObserverPtr&, const string&, const string&) +{ + return 0; +} + +RequestObserverPtr +ObserverResolverI::getInvocationObserver(const RequestObserverPtr&, const ObjectPrx&, const string&) +{ + return 0; +} + +RequestObserverPtr +ObserverResolverI::getDispatchObserver(const RequestObserverPtr&, const ObjectPtr&, const Current&) +{ + return 0; +} diff --git a/cpp/src/Ice/MetricsObserverI.h b/cpp/src/Ice/MetricsObserverI.h new file mode 100644 index 00000000000..7b5b4abbf5a --- /dev/null +++ b/cpp/src/Ice/MetricsObserverI.h @@ -0,0 +1,179 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#ifndef ICE_METRICSSTATS_I_H +#define ICE_METRICSSTATS_I_H + +#include <Ice/Observer.h> +#include <Ice/Metrics.h> + +#include <Ice/MetricsFunctional.h> + +namespace IceMetrics +{ + +class MetricsAdminI; +typedef IceUtil::Handle<MetricsAdminI> MetricsAdminIPtr; + +template<class T> class ObjectObserverI : virtual public Ice::ObjectObserver +{ +public: + + typedef T Type; + typedef IceUtil::Handle<T> PtrType; + typedef std::vector<PtrType> SeqType; + + ObjectObserverI(const std::vector<MetricsObjectPtr>& objects) + { + for(std::vector<MetricsObjectPtr>::const_iterator p = objects.begin(); p != objects.end(); ++p) + { + _objects.push_back(PtrType::dynamicCast(*p)); + } + } + + virtual void + attach() + { + for(typename SeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) + { + ++(*p)->total; + ++(*p)->current; + } + } + + virtual void + detach() + { + forEach(decMember(&MetricsObject::current)); + } + + template<typename Function> void + forEach(Function func) + { + for(typename SeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) + { + func(*p); + } + } + + void + update(ObjectObserverI* old) + { + typename SeqType::const_iterator q = old->_objects.begin(); + for(typename SeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) + { + if(*p != *q) + { + ++(*p)->total; + ++(*p)->current; + } + else + { + ++q; + } + } + } + +private: + + SeqType _objects; +}; + +class ConnectionObserverI : public Ice::ConnectionObserver, public ObjectObserverI<ConnectionMetricsObject> +{ +public: + + ConnectionObserverI(const std::vector<MetricsObjectPtr>& objects) : ObjectObserverI(objects) + { + } + + virtual void attach(); + virtual void detach(); + + virtual void stateChanged(Ice::ConnectionState, Ice::ConnectionState); + virtual void sentBytes(Ice::Int, Ice::Long); + virtual void receivedBytes(Ice::Int, Ice::Long); +}; + +template<typename T> +class ObjectObserverResolver +{ +public: + + typedef IceUtil::Handle<T> TPtr; + typedef std::map<std::vector<MetricsObjectPtr>, TPtr> ObserverMap; + + template<typename S> T* + getObserver(const std::vector<MetricsMap::Entry>& objects, S* oldObserver) + { + if(objects.empty()) + { + return 0; + } + + typename ObserverMap::const_iterator p = _stats.find(objects); + if(p == _stats.end()) + { + p = _stats.insert(make_pair(objects, new T(objects))).first; + } + + T* newObserver = p->second.get(); + if(oldObserver && static_cast<S*>(newObserver) != oldObserver) + { + newObserver->update(dynamic_cast<T*>(oldObserver)); + } + return newObserver; + } + +private: + + ObserverMap _stats; +}; + +class ObjectObserverUpdater : public IceUtil::Shared +{ +public: + + virtual void update() = 0; +}; +typedef IceUtil::Handle<ObjectObserverUpdater> ObjectObserverUpdaterPtr; + +class ObserverResolverI : public Ice::ObserverResolver +{ +public: + + ObserverResolverI(const MetricsAdminIPtr&); + + virtual void setObserverUpdater(const Ice::ObserverUpdaterPtr&); + + virtual Ice::ConnectionObserverPtr + getConnectionObserver(const Ice::ConnectionObserverPtr&, const Ice::ConnectionPtr&); + + virtual Ice::ObjectObserverPtr + getThreadObserver(const Ice::ObjectObserverPtr&, const std::string&, const std::string&); + + virtual Ice::ThreadPoolThreadObserverPtr + getThreadPoolThreadObserver(const Ice::ThreadPoolThreadObserverPtr&, const std::string&, const std::string&); + + virtual Ice::RequestObserverPtr + getInvocationObserver(const Ice::RequestObserverPtr&, const Ice::ObjectPrx&, const std::string&); + + virtual Ice::RequestObserverPtr + getDispatchObserver(const Ice::RequestObserverPtr&, const Ice::ObjectPtr&, const Ice::Current&); + +private: + + const MetricsAdminIPtr _metrics; + + ObjectObserverResolver<ConnectionObserverI> _connections; +}; + +} + +#endif diff --git a/slice/Ice/Metrics.ice b/slice/Ice/Metrics.ice new file mode 100644 index 00000000000..fffc0ad3e43 --- /dev/null +++ b/slice/Ice/Metrics.ice @@ -0,0 +1,112 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +module IceMetrics +{ + +class MetricsObject +{ + string id; + int total; + int current; +}; + +sequence<MetricsObject> MetricsObjectSeq; +dictionary<string, MetricsObjectSeq> MetricsObjectSeqDict; +dictionary<string, MetricsObjectSeqDict> MetricsObjectSeqDictDict; + +dictionary<string, string> NameValueDict; + +exception UnknownMetricsView +{ +}; + +interface MetricsAdmin +{ + /** + * + * Get the metrics objects for the given metrics view. This + * returns a map of metric maps for each metrics class configured + * with the view. + * + **/ + MetricsObjectSeqDict getMetrics(string view) + throws UnknownMetricsView; + + /** + * + * Get all the metrics objects for the given metrics class. This + * returns a map of metrics view. + * + **/ + MetricsObjectSeqDictDict getAllMetrics(); + + /** + * + * Monitor a new metrics class with the given view. + * + **/ + void addClassToView(string name, string cl, string groupBy, NameValueDict acceptFilter, NameValueDict rejectFilter); + + /** + * + * No longer monitor the given metrics class with the given view. + * + **/ + void removeClassFromView(string name, string cl) + throws UnknownMetricsView; +}; + +local interface MetricsViewProvider +{ + void addView(string name, string value, NameValueDict accept, NameValueDict reject); + void removeView(string name); + MetricsObjectSeq getView(string view); +}; + +local interface MetricsViewProviderManager +{ + void add(string cl, MetricsViewProvider provider); + void remove(string cl); +}; + +class ThreadPoolThreadMetricsObject extends MetricsObject +{ + int inUse; + int inUseForIO; +}; + +class RequestMetricsObject extends MetricsObject +{ + int responseOK; + int responseUserException; + int responseSytemException; + int failure; + + long marshalTime; + long unmarshalTime; + long time; +}; + +class ConnectionMetricsObject extends MetricsObject +{ + int initializing; + int holding; + int active; + int closing; + int closed; + + long receivedBytes; + long receivedTime; + + long sentBytes; + long sentTime; +}; + +};
\ No newline at end of file diff --git a/slice/Ice/Observer.ice b/slice/Ice/Observer.ice new file mode 100644 index 00000000000..76237f35104 --- /dev/null +++ b/slice/Ice/Observer.ice @@ -0,0 +1,84 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#pragma once + +[["cpp:header-ext:h"]] + +#include <Ice/ConnectionF.ice> +#include <Ice/Current.ice> + +module Ice +{ + +local interface ObjectObserver +{ + void attach(); + void detach(); +}; + +enum ThreadState +{ + ThreadIddle, + ThreadInUse, + ThreadInUseForIO, +}; + +local interface ThreadPoolThreadObserver extends ObjectObserver +{ + void stateChanged(ThreadState oldState, ThreadState newState); +}; + +local interface RequestObserver extends ObjectObserver +{ + void reponseOK(); + void reponseUserException(); + void reponseSystemException(); + void failure(); + void marshalTime(long time); + void unmarshalTime(long time); + void callTime(long time); +}; + +enum ConnectionState +{ + ConnectionStateInitializing, + ConnectionStateHolding, + ConnectionStateActive, + ConnectionStateClosing, + ConnectionStateClosed +}; + +local interface ConnectionObserver extends ObjectObserver +{ + void stateChanged(ConnectionState oldState, ConnectionState newState); + void sentBytes(int num, long duration); + void receivedBytes(int num, long duration); +}; + +local interface ObserverUpdater +{ + void updateConnectionObservers(); + void updateThreadObservers(); + void updateThreadPoolThreadObservers(); +}; + +local interface ObserverResolver +{ + void setObserverUpdater(ObserverUpdater refresher); + + ConnectionObserver getConnectionObserver(ConnectionObserver old, Connection con); + ObjectObserver getThreadObserver(ObjectObserver old, string parent, string id); + ThreadPoolThreadObserver getThreadPoolThreadObserver(ThreadPoolThreadObserver old, string parent, string id); + RequestObserver getInvocationObserver(RequestObserver old, Object* prx, string operation); + RequestObserver getDispatchObserver(RequestObserver old, Object obj, Current c); +}; + +}; + diff --git a/slice/Ice/ObserverF.ice b/slice/Ice/ObserverF.ice new file mode 100644 index 00000000000..5118ffcbaf1 --- /dev/null +++ b/slice/Ice/ObserverF.ice @@ -0,0 +1,25 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#pragma once + +[["cpp:header-ext:h"]] + +module Ice +{ + +local interface ObjectObserver; +local interface ThreadPoolThreadObserver; +local interface Requesttats; +local interface ConnectionObserver; + +local interface ObserverResolver; + +}; + diff --git a/slice/Ice/Stats.ice b/slice/Ice/Stats.ice index 072fd5c6bd2..aa33879c66c 100644 --- a/slice/Ice/Stats.ice +++ b/slice/Ice/Stats.ice @@ -47,6 +47,6 @@ local interface Stats **/ void bytesReceived(string protocol, int num); }; - + }; |