summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/include/Ice/Initialize.h2
-rw-r--r--cpp/include/IceUtil/StopWatch.h49
-rw-r--r--cpp/src/Ice/ConnectionI.cpp105
-rw-r--r--cpp/src/Ice/ConnectionI.h8
-rw-r--r--cpp/src/Ice/Makefile10
-rw-r--r--cpp/src/Ice/MetricsAdminI.cpp198
-rw-r--r--cpp/src/Ice/MetricsAdminI.h107
-rw-r--r--cpp/src/Ice/MetricsObserverI.cpp188
-rw-r--r--cpp/src/Ice/MetricsObserverI.h179
-rw-r--r--slice/Ice/Metrics.ice112
-rw-r--r--slice/Ice/Observer.ice84
-rw-r--r--slice/Ice/ObserverF.ice25
-rw-r--r--slice/Ice/Stats.ice2
13 files changed, 1065 insertions, 4 deletions
diff --git a/cpp/include/Ice/Initialize.h b/cpp/include/Ice/Initialize.h
index 57a0fc55a56..58b8a72adce 100644
--- a/cpp/include/Ice/Initialize.h
+++ b/cpp/include/Ice/Initialize.h
@@ -16,6 +16,7 @@
#include <Ice/LoggerF.h>
#include <Ice/StreamF.h>
#include <Ice/StatsF.h>
+#include <Ice/ObserverF.h>
#include <Ice/Dispatcher.h>
#include <Ice/StringConverter.h>
#include <Ice/BuiltinSequences.h>
@@ -84,6 +85,7 @@ struct InitializationData
PropertiesPtr properties;
LoggerPtr logger;
StatsPtr stats;
+ ObserverResolverPtr observerResolver;
StringConverterPtr stringConverter;
WstringConverterPtr wstringConverter;
ThreadNotificationPtr threadHook;
diff --git a/cpp/include/IceUtil/StopWatch.h b/cpp/include/IceUtil/StopWatch.h
new file mode 100644
index 00000000000..1ffabf199a1
--- /dev/null
+++ b/cpp/include/IceUtil/StopWatch.h
@@ -0,0 +1,49 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#ifndef ICE_UTIL_STOPWATCH_H
+#define ICE_UTIL_STOPWATCH_H
+
+#include <IceUtil/Time.h>
+
+namespace IceUtilInternal
+{
+
+class ICE_UTIL_API StopWatch
+{
+public:
+
+ StopWatch() { }
+
+ void start()
+ {
+ _s = IceUtil::Time::now(IceUtil::Time::Monotonic);
+ }
+
+ long stop()
+ {
+ assert(isStarted());
+ long d = (IceUtil::Time::now(IceUtil::Time::Monotonic) - _s).toMicroSeconds();
+ _s = IceUtil::Time();
+ return d;
+ }
+
+ bool isStarted() const
+ {
+ return _s != IceUtil::Time();
+ }
+
+private:
+
+ IceUtil::Time _s;
+};
+
+} // End namespace IceUtilInternal
+
+#endif
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index e1a8e03dd82..7e619ba2e0f 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -25,6 +25,7 @@
#include <Ice/LocalException.h>
#include <Ice/ReferenceFactory.h> // For createProxy().
#include <Ice/ProxyFactory.h> // For createProxy().
+#include <Ice/Observer.h>
#include <bzlib.h>
using namespace std;
@@ -118,6 +119,16 @@ private:
ConnectionIPtr _connection;
};
+Ice::ConnectionState connectionStateMap[] = {
+ Ice::ConnectionStateInitializing, // StateNotInitialized
+ Ice::ConnectionStateInitializing, // StateNotValidated
+ Ice::ConnectionStateActive, // StateActive
+ Ice::ConnectionStateHolding, // StateHolding
+ Ice::ConnectionStateClosing, // StateClosing
+ Ice::ConnectionStateClosed, // StateClosed
+ Ice::ConnectionStateClosed, // StateFinished
+};
+
}
void
@@ -1145,6 +1156,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
{
if(operation & SocketOperationWrite)
{
+ if(_observer)
+ {
+ assert(!_writeWatch.isStarted());
+ _writeStreamPos = _writeStream.i;
+ _writeWatch.start();
+ }
+
if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty())
{
// The whole message is written, assume it's sent now for at-most-once semantics.
@@ -1153,6 +1171,13 @@ Ice::ConnectionI::startAsync(SocketOperation operation)
}
else if(operation & SocketOperationRead)
{
+ if(_observer && !_readHeader)
+ {
+ assert(!_readWatch.isStarted());
+ _readStreamPos = _readStream.i;
+ _readWatch.start();
+ }
+
_transceiver->startRead(_readStream);
}
}
@@ -1171,10 +1196,20 @@ Ice::ConnectionI::finishAsync(SocketOperation operation)
{
if(operation & SocketOperationWrite)
{
+ if(_observer)
+ {
+ assert(_writeWatch.isStarted() && _writeStream.i >= _writeStreamPos);
+ _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop());
+ }
_transceiver->finishWrite(_writeStream);
}
else if(operation & SocketOperationRead)
{
+ if(_observer && !_readHeader)
+ {
+ assert(_readStream.i >= _readStreamPos);
+ _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop());
+ }
_transceiver->finishRead(_readStream);
}
}
@@ -1219,12 +1254,29 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
unscheduleTimeout(current.operation);
if(current.operation & SocketOperationWrite && !_writeStream.b.empty())
{
+ if(_observer && _writeStream.i != _writeStream.b.end())
+ {
+ if(_writeWatch.isStarted())
+ {
+ assert(_writeStream.i >= _writeStreamPos);
+ _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop());
+ }
+ _writeStreamPos = _writeStream.i;
+ _writeWatch.start();
+ }
+
if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream))
{
assert(!_writeStream.b.empty());
scheduleTimeout(SocketOperationWrite, _endpoint->timeout());
return;
}
+
+ if(_observer)
+ {
+ assert(_writeStream.i >= _writeStreamPos);
+ _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop());
+ }
assert(_writeStream.i == _writeStream.b.end());
}
if(current.operation & SocketOperationRead && !_readStream.b.empty())
@@ -1237,6 +1289,17 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
assert(_readStream.i == _readStream.b.end());
_readHeader = false;
+
+ if(_observer)
+ {
+ //
+ // We can't measure the time to receive the header as it would
+ // include the wait time. We start the timer now.
+ //
+ _observer->receivedBytes(static_cast<int>(_readStream.i - _readStream.b.begin()), 0);
+ _readStreamPos = _readStream.i;
+ _readWatch.start();
+ }
ptrdiff_t pos = _readStream.i - _readStream.b.begin();
if(pos < headerSize)
@@ -1284,6 +1347,17 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
_readStream.i = _readStream.b.begin() + pos;
}
+ if(_observer && _readStream.i != _readStream.b.end())
+ {
+ if(_readWatch.isStarted())
+ {
+ assert(_readStream.i >= _readStreamPos);
+ _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop());
+ }
+ _readStreamPos = _readStream.i;
+ _readWatch.start();
+ }
+
if(_readStream.i != _readStream.b.end())
{
if(_endpoint->datagram())
@@ -1301,6 +1375,12 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
assert(_readStream.i == _readStream.b.end());
}
}
+
+ if(_observer)
+ {
+ assert(_readStream.i >= _readStreamPos);
+ _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop());
+ }
}
if(_state <= StateNotValidated)
@@ -1778,7 +1858,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_state(StateNotInitialized),
_shutdownInitiated(false)
{
-
int& compressionLevel = const_cast<int&>(_compressionLevel);
compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault(
"Ice.Compression.Level", 1);
@@ -1826,6 +1905,11 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool();
}
_threadPool->initialize(this);
+
+ if(_instance->initializationData().observerResolver)
+ {
+ _observer = _instance->initializationData().observerResolver->getConnectionObserver(_observer, this);
+ }
}
catch(const IceUtil::Exception&)
{
@@ -1833,10 +1917,20 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
throw;
}
__setNoDelete(false);
+
+ if(_observer)
+ {
+ _observer->attach();
+ }
}
Ice::ConnectionI::~ConnectionI()
{
+ if(_observer)
+ {
+ _observer->detach();
+ }
+
assert(!_startCallback);
assert(_state == StateFinished);
assert(_dispatchCount == 0);
@@ -2040,6 +2134,15 @@ Ice::ConnectionI::setState(State state)
}
}
+ if(_observer)
+ {
+ Ice::ConnectionState oldState = connectionStateMap[static_cast<int>(_state)];
+ Ice::ConnectionState newState = connectionStateMap[static_cast<int>(state)];
+ if(oldState != newState)
+ {
+ _observer->stateChanged(oldState, newState);
+ }
+ }
_state = state;
notifyAll();
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 8648f8f302c..246cf72e9f2 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -13,6 +13,7 @@
#include <IceUtil/Mutex.h>
#include <IceUtil/Monitor.h>
#include <IceUtil/Time.h>
+#include <IceUtil/StopWatch.h>
#include <IceUtil/Timer.h>
#include <Ice/CommunicatorF.h>
@@ -30,6 +31,7 @@
#include <Ice/OutgoingAsyncF.h>
#include <Ice/EventHandler.h>
#include <Ice/Dispatcher.h>
+#include <Ice/ObserverF.h>
#include <deque>
#include <memory>
@@ -277,6 +279,7 @@ private:
const std::string _type;
const IceInternal::ConnectorPtr _connector;
const IceInternal::EndpointIPtr _endpoint;
+ Ice::ConnectionObserverPtr _observer;
ObjectAdapterPtr _adapter;
IceInternal::ServantManagerPtr _servantManager;
@@ -324,6 +327,11 @@ private:
bool _readHeader;
IceInternal::BasicStream _writeStream;
+ Ice::Byte* _writeStreamPos;
+ IceUtilInternal::StopWatch _writeWatch;
+ Ice::Byte* _readStreamPos;
+ IceUtilInternal::StopWatch _readWatch;
+
int _dispatchCount;
State _state; // The current state.
diff --git a/cpp/src/Ice/Makefile b/cpp/src/Ice/Makefile
index 70da4e66b31..00b5893bf4b 100644
--- a/cpp/src/Ice/Makefile
+++ b/cpp/src/Ice/Makefile
@@ -60,6 +60,9 @@ OBJS = Acceptor.o \
LoggerI.o \
Logger.o \
LoggerUtil.o \
+ Metrics.o \
+ MetricsAdminI.o \
+ MetricsObserverI.o \
Network.o \
ObjectAdapterFactory.o \
ObjectAdapterI.o \
@@ -67,6 +70,7 @@ OBJS = Acceptor.o \
ObjectFactoryManager.o \
ObjectFactory.o \
Object.o \
+ Observer.o \
OpaqueEndpointI.o \
OutgoingAsync.o \
Outgoing.o \
@@ -132,10 +136,13 @@ SLICE_SRCS = $(SDIR)/BuiltinSequences.ice \
$(SDIR)/Locator.ice \
$(SDIR)/LoggerF.ice \
$(SDIR)/Logger.ice \
+ $(SDIR)/Metrics.ice \
$(SDIR)/ObjectAdapterF.ice \
$(SDIR)/ObjectAdapter.ice \
$(SDIR)/ObjectFactoryF.ice \
$(SDIR)/ObjectFactory.ice \
+ $(SDIR)/Observer.ice \
+ $(SDIR)/ObserverF.ice \
$(SDIR)/PluginF.ice \
$(SDIR)/Plugin.ice \
$(SDIR)/ProcessF.ice \
@@ -149,8 +156,7 @@ SLICE_SRCS = $(SDIR)/BuiltinSequences.ice \
$(SDIR)/SliceChecksumDict.ice \
$(SDIR)/StatsF.ice \
$(SDIR)/Stats.ice \
- $(SDIR)/Version.ice
-
+ $(SDIR)/Version.ice \
HDIR = $(headerdir)/Ice
SDIR = $(slicedir)/Ice
diff --git a/cpp/src/Ice/MetricsAdminI.cpp b/cpp/src/Ice/MetricsAdminI.cpp
new file mode 100644
index 00000000000..c30e298bafb
--- /dev/null
+++ b/cpp/src/Ice/MetricsAdminI.cpp
@@ -0,0 +1,198 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/MetricsAdminI.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceMetrics;
+
+
+MetricsMap::MetricsMap(const string& groupBy, const NameValueDict& accept, const NameValueDict& reject) :
+ _accept(accept), _reject(reject)
+{
+ // TODO: groupBy
+}
+
+void
+MetricsMap::destroy()
+{
+ _objects.clear();
+}
+
+MetricsObjectSeq
+MetricsMap::getMetricsObjects() const
+{
+ MetricsObjectSeq objects;
+ for(map<string, MetricsObjectPtr>::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
+ {
+ // TODO: Fix ice_clone!
+ objects.push_back(dynamic_cast<MetricsObject*>(p->second->ice_clone().get()));
+ }
+ return objects;
+}
+
+MetricsMap::Entry
+MetricsMap::getMatching(const MetricsHelper& helper)
+{
+ for(map<string, string>::const_iterator p = _accept.begin(); p != _accept.end(); ++p)
+ {
+ if(!match(helper(p->first), p->second))
+ {
+ return MetricsMap::Entry();
+ }
+ }
+
+ for(map<string, string>::const_iterator p = _reject.begin(); p != _reject.end(); ++p)
+ {
+ if(match(helper(p->first), p->second))
+ {
+ return MetricsMap::Entry();
+ }
+ }
+
+ ostringstream os;
+ vector<string>::const_iterator q = _groupBySeparators.begin();
+ for(vector<string>::const_iterator p = _groupByAttributes.begin(); p != _groupByAttributes.end(); ++p)
+ {
+ os << helper(*p) << *q;
+ }
+
+ string key = os.str();
+ map<string, Entry>::const_iterator p = _objects.find(key);
+ if(p == _objects.end())
+ {
+ Entry e;
+ e.object = helper.newMetricsObject();
+ e.object->id = key;
+ e.map = this;
+ p = _objects.insert(make_pair(os.str(), e)).first;
+ }
+ return p->second;
+}
+
+bool
+MetricsMap::match(const string& value, const string& expr) const
+{
+ return true; // TODO
+}
+
+MetricsView::MetricsView()
+{
+}
+
+void
+MetricsView::add(const string& cl, const string& groupBy, const NameValueDict& accept, const NameValueDict& reject)
+{
+ _maps.insert(make_pair(cl, new MetricsMap(groupBy, accept, reject)));
+}
+
+void
+MetricsView::remove(const string& cl)
+{
+ _maps.erase(cl);
+}
+
+MetricsObjectSeqDict
+MetricsView::getMetricsObjects() const
+{
+ MetricsObjectSeqDict metrics;
+ for(map<string, MetricsMapPtr>::const_iterator p = _maps.begin(); p != _maps.end(); ++p)
+ {
+ metrics.insert(make_pair(p->first, p->second->getMetricsObjects()));
+ }
+ return metrics;
+}
+
+MetricsMap::Entry
+MetricsView::getMatching(const string& cl, const MetricsHelper& helper) const
+{
+ map<string, MetricsMapPtr>::const_iterator p = _maps.find(cl);
+ if(p != _maps.end())
+ {
+ return p->second->getMatching(helper);
+ }
+ return MetricsMap::Entry()
+}
+
+void
+MetricsAdminI::addUpdater(const string& cl, const ObjectObserverUpdaterPtr& updater)
+{
+ _updaters.insert(make_pair(cl, updater));
+}
+
+vector<MetricsMap::Entry>
+MetricsAdminI::getMatching(const string& cl, const MetricsHelper& helper) const
+{
+ Lock sync(*this);
+ vector<MetricsMap::Entry> objects;
+ for(map<string, MetricsViewPtr>::const_iterator p = _views.begin(); p != _views.end(); ++p)
+ {
+ MetricsMap::Entry e = p->second->getMatching(cl, helper);
+ if(e.object)
+ {
+ objects.push_back(e);
+ }
+ }
+ return objects;
+}
+
+MetricsObjectSeqDict
+MetricsAdminI::getMetrics(const string& view, const Ice::Current&)
+{
+ Lock sync(*this);
+ std::map<string, MetricsViewPtr>::const_iterator p = _views.find(view);
+ if(p == _views.end())
+ {
+ throw UnknownMetricsView();
+ }
+ return p->second->getMetricsObjects();
+}
+
+MetricsObjectSeqDictDict
+MetricsAdminI::getAllMetrics(const Ice::Current&)
+{
+ Lock sync(*this);
+ MetricsObjectSeqDictDict metrics;
+ for(map<string, MetricsViewPtr>::const_iterator p = _views.begin(); p != _views.end(); ++p)
+ {
+ metrics.insert(make_pair(p->first, p->second->getMetricsObjects()));
+ }
+ return metrics;
+}
+
+void
+MetricsAdminI::addClassToView(const string& view,
+ const string& cl,
+ const string& groupBy,
+ const NameValueDict& accept,
+ const NameValueDict& reject,
+ const Ice::Current&)
+{
+ {
+ Lock sync(*this);
+ map<string, MetricsViewPtr>::const_iterator p = _views.find(view);
+ if(p == _views.end())
+ {
+ p = _views.insert(make_pair(view, new MetricsView())).first;
+ }
+ p->second->add(cl, groupBy, accept, reject);
+ }
+ _updaters[cl]->update();
+}
+
+void
+MetricsAdminI::removeClassFromView(const string& view, const string& cl, const Ice::Current&)
+{
+ {
+ Lock sync(*this);
+ _views.erase(cl);
+ }
+ _updaters[cl]->update();
+}
diff --git a/cpp/src/Ice/MetricsAdminI.h b/cpp/src/Ice/MetricsAdminI.h
new file mode 100644
index 00000000000..a021ae77845
--- /dev/null
+++ b/cpp/src/Ice/MetricsAdminI.h
@@ -0,0 +1,107 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#ifndef ICE_METRICSADMIN_I_H
+#define ICE_METRICSADMIN_I_H
+
+#include <Ice/Metrics.h>
+#include <Ice/Stats.h>
+
+#include <Ice/MetricsObserverI.h>
+
+namespace IceMetrics
+{
+
+class MetricsHelper
+{
+public:
+
+ virtual std::string operator()(const std::string&) const = 0;
+
+ virtual MetricsObjectPtr newMetricsObject() const = 0;
+};
+
+class MetricsMap : public IceUtil::Shared, IceUtil::Mutex
+{
+public:
+
+ struct Entry
+ {
+ MetricsObjectPtr object;
+ MetricsMapPtr map;
+
+ bool operator<(const Entry& e)
+ {
+ return object < e.object;
+ }
+ };
+
+ MetricsMap(const std::string&, const NameValueDict&, const NameValueDict&);
+
+ void destroy();
+
+ MetricsObjectSeq getMetricsObjects() const;
+
+ MetricsObjectPtr getMatching(const MetricsHelper&);
+
+private:
+
+ bool match(const std::string&, const std::string&) const;
+
+ const std::vector<std::string> _groupByAttributes;
+ const std::vector<std::string> _groupBySeparators;
+ const NameValueDict _accept;
+ const NameValueDict _reject;
+ std::map<std::string, MetricsObjectPtr> _objects;
+};
+typedef IceUtil::Handle<MetricsMap> MetricsMapPtr;
+
+class MetricsView : public IceUtil::Shared
+{
+public:
+
+ MetricsView();
+
+ void add(const std::string&, const std::string&, const NameValueDict&, const NameValueDict&);
+ void remove(const std::string&);
+
+ MetricsObjectSeqDict getMetricsObjects() const;
+
+ MetricsObjectPtr getMatching(const std::string&, const MetricsHelper&) const;
+
+private:
+
+ std::map<std::string, MetricsMapPtr> _maps;
+};
+typedef IceUtil::Handle<MetricsView> MetricsViewPtr;
+
+class MetricsAdminI : public MetricsAdmin, public IceUtil::Mutex
+{
+public:
+
+ MetricsObjectSeq getMatching(const std::string&, const MetricsHelper&) const;
+ void addUpdater(const std::string&, const ObjectObserverUpdaterPtr&);
+
+ virtual MetricsObjectSeqDict getMetrics(const std::string&, const Ice::Current&);
+ virtual MetricsObjectSeqDictDict getAllMetrics(const Ice::Current&);
+
+ virtual void addClassToView(const std::string&, const std::string&, const std::string&, const NameValueDict&,
+ const NameValueDict&, const Ice::Current&);
+
+ virtual void removeClassFromView(const std::string&, const std::string&, const Ice::Current&);
+
+private:
+
+ std::map<std::string, MetricsViewPtr> _views;
+ std::map<std::string, ObjectObserverUpdaterPtr> _updaters;
+};
+
+};
+
+#endif
diff --git a/cpp/src/Ice/MetricsObserverI.cpp b/cpp/src/Ice/MetricsObserverI.cpp
new file mode 100644
index 00000000000..c3e51b170ba
--- /dev/null
+++ b/cpp/src/Ice/MetricsObserverI.cpp
@@ -0,0 +1,188 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/MetricsObserverI.h>
+#include <Ice/MetricsAdminI.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceMetrics;
+
+namespace
+{
+
+class ConnectionMetricsHelper : public MetricsHelper
+{
+public:
+
+ ConnectionMetricsHelper(const ConnectionPtr& con) : _connection(con)
+ {
+ }
+
+ virtual string operator()(const string& attribute) const
+ {
+ return ""; // TODO: return attribute value
+ }
+
+ virtual MetricsObjectPtr newMetricsObject() const
+ {
+ return new ConnectionMetricsObject();
+ }
+
+private:
+
+ Ice::ConnectionPtr _connection;
+};
+
+}
+
+void
+ConnectionObserverI::attach()
+{
+ struct Attach
+ {
+ void operator()(const ConnectionMetricsObjectPtr& v)
+ {
+ ++v->total;
+ ++v->current;
+ ++v->initializing;
+ }
+ };
+ forEach(Attach());
+}
+
+void
+ConnectionObserverI::detach()
+{
+ struct Detach
+ {
+ void operator()(const ConnectionMetricsObjectPtr& v)
+ {
+ ++v->total;
+ ++v->current;
+ ++v->initializing;
+ }
+ };
+ forEach(Detach());
+}
+
+void
+ConnectionObserverI::stateChanged(ConnectionState oldState, ConnectionState newState)
+{
+ struct StateChanged
+ {
+ StateChanged(ConnectionState oldState, ConnectionState newState) :
+ oldState(oldState), newState(newState)
+ {
+ }
+
+ void operator()(const ConnectionMetricsObjectPtr& v)
+ {
+ --(v.get()->*getConnectionStateMetric(oldState));
+ ++(v.get()->*getConnectionStateMetric(newState));
+ }
+
+ int ConnectionMetricsObject::*
+ getConnectionStateMetric(ConnectionState s)
+ {
+ switch(s)
+ {
+ case ConnectionStateInitializing:
+ return &ConnectionMetricsObject::initializing;
+ case ConnectionStateActive:
+ return &ConnectionMetricsObject::active;
+ case ConnectionStateHolding:
+ return &ConnectionMetricsObject::holding;
+ case ConnectionStateClosing:
+ return &ConnectionMetricsObject::closing;
+ case ConnectionStateClosed:
+ return &ConnectionMetricsObject::closed;
+ }
+ }
+
+ ConnectionState oldState;
+ ConnectionState newState;
+ }
+ forEach(StateChanged(oldState, newState));
+}
+
+void
+ConnectionObserverI::sentBytes(Int num, Long duration)
+{
+ forEach(aggregate(applyOnMember(&ConnectionMetricsObject::sentBytes, Add<Int>(num)),
+ applyOnMember(&ConnectionMetricsObject::sentTime, Add<Long>(duration))));
+}
+
+void
+ConnectionObserverI::receivedBytes(Int num, Long duration)
+{
+ forEach(aggregate(applyOnMember(&ConnectionMetricsObject::receivedBytes, Add<Int>(num)),
+ applyOnMember(&ConnectionMetricsObject::receivedTime, Add<Long>(duration))));
+}
+
+ObserverResolverI::ObserverResolverI(const MetricsAdminIPtr& metrics) : _metrics(metrics)
+{
+}
+
+void
+ObserverResolverI::setObserverUpdater(const ObserverUpdaterPtr& updater)
+{
+ class Updater : public ObjectObserverUpdater
+ {
+ public:
+
+ Updater(const ObserverUpdaterPtr& updater, void (ObserverUpdater::*fn)()) :
+ _updater(updater), _fn(fn)
+ {
+ }
+
+ virtual void update()
+ {
+ (_updater.get()->*_fn)();
+ }
+
+ private:
+
+ const ObserverUpdaterPtr _updater;
+ void (ObserverUpdater::*_fn)();
+ };
+ _metrics->addUpdater("Connection", new Updater(updater, &ObserverUpdater::updateConnectionObservers));
+ _metrics->addUpdater("Thread", new Updater(updater, &ObserverUpdater::updateThreadObservers));
+ _metrics->addUpdater("ThreadPoolThread", new Updater(updater, &ObserverUpdater::updateThreadPoolThreadObservers));
+}
+
+ConnectionObserverPtr
+ObserverResolverI::getConnectionObserver(const ConnectionObserverPtr& old, const ConnectionPtr& con)
+{
+ return _connections.getObserver(_metrics->getMatching("Connection", ConnectionMetricsHelper(con)), old.get());
+}
+
+ObjectObserverPtr
+ObserverResolverI::getThreadObserver(const ObjectObserverPtr&, const string&, const string&)
+{
+ return 0;
+}
+
+ThreadPoolThreadObserverPtr
+ObserverResolverI::getThreadPoolThreadObserver(const ThreadPoolThreadObserverPtr&, const string&, const string&)
+{
+ return 0;
+}
+
+RequestObserverPtr
+ObserverResolverI::getInvocationObserver(const RequestObserverPtr&, const ObjectPrx&, const string&)
+{
+ return 0;
+}
+
+RequestObserverPtr
+ObserverResolverI::getDispatchObserver(const RequestObserverPtr&, const ObjectPtr&, const Current&)
+{
+ return 0;
+}
diff --git a/cpp/src/Ice/MetricsObserverI.h b/cpp/src/Ice/MetricsObserverI.h
new file mode 100644
index 00000000000..7b5b4abbf5a
--- /dev/null
+++ b/cpp/src/Ice/MetricsObserverI.h
@@ -0,0 +1,179 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#ifndef ICE_METRICSSTATS_I_H
+#define ICE_METRICSSTATS_I_H
+
+#include <Ice/Observer.h>
+#include <Ice/Metrics.h>
+
+#include <Ice/MetricsFunctional.h>
+
+namespace IceMetrics
+{
+
+class MetricsAdminI;
+typedef IceUtil::Handle<MetricsAdminI> MetricsAdminIPtr;
+
+template<class T> class ObjectObserverI : virtual public Ice::ObjectObserver
+{
+public:
+
+ typedef T Type;
+ typedef IceUtil::Handle<T> PtrType;
+ typedef std::vector<PtrType> SeqType;
+
+ ObjectObserverI(const std::vector<MetricsObjectPtr>& objects)
+ {
+ for(std::vector<MetricsObjectPtr>::const_iterator p = objects.begin(); p != objects.end(); ++p)
+ {
+ _objects.push_back(PtrType::dynamicCast(*p));
+ }
+ }
+
+ virtual void
+ attach()
+ {
+ for(typename SeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
+ {
+ ++(*p)->total;
+ ++(*p)->current;
+ }
+ }
+
+ virtual void
+ detach()
+ {
+ forEach(decMember(&MetricsObject::current));
+ }
+
+ template<typename Function> void
+ forEach(Function func)
+ {
+ for(typename SeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
+ {
+ func(*p);
+ }
+ }
+
+ void
+ update(ObjectObserverI* old)
+ {
+ typename SeqType::const_iterator q = old->_objects.begin();
+ for(typename SeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
+ {
+ if(*p != *q)
+ {
+ ++(*p)->total;
+ ++(*p)->current;
+ }
+ else
+ {
+ ++q;
+ }
+ }
+ }
+
+private:
+
+ SeqType _objects;
+};
+
+class ConnectionObserverI : public Ice::ConnectionObserver, public ObjectObserverI<ConnectionMetricsObject>
+{
+public:
+
+ ConnectionObserverI(const std::vector<MetricsObjectPtr>& objects) : ObjectObserverI(objects)
+ {
+ }
+
+ virtual void attach();
+ virtual void detach();
+
+ virtual void stateChanged(Ice::ConnectionState, Ice::ConnectionState);
+ virtual void sentBytes(Ice::Int, Ice::Long);
+ virtual void receivedBytes(Ice::Int, Ice::Long);
+};
+
+template<typename T>
+class ObjectObserverResolver
+{
+public:
+
+ typedef IceUtil::Handle<T> TPtr;
+ typedef std::map<std::vector<MetricsObjectPtr>, TPtr> ObserverMap;
+
+ template<typename S> T*
+ getObserver(const std::vector<MetricsMap::Entry>& objects, S* oldObserver)
+ {
+ if(objects.empty())
+ {
+ return 0;
+ }
+
+ typename ObserverMap::const_iterator p = _stats.find(objects);
+ if(p == _stats.end())
+ {
+ p = _stats.insert(make_pair(objects, new T(objects))).first;
+ }
+
+ T* newObserver = p->second.get();
+ if(oldObserver && static_cast<S*>(newObserver) != oldObserver)
+ {
+ newObserver->update(dynamic_cast<T*>(oldObserver));
+ }
+ return newObserver;
+ }
+
+private:
+
+ ObserverMap _stats;
+};
+
+class ObjectObserverUpdater : public IceUtil::Shared
+{
+public:
+
+ virtual void update() = 0;
+};
+typedef IceUtil::Handle<ObjectObserverUpdater> ObjectObserverUpdaterPtr;
+
+class ObserverResolverI : public Ice::ObserverResolver
+{
+public:
+
+ ObserverResolverI(const MetricsAdminIPtr&);
+
+ virtual void setObserverUpdater(const Ice::ObserverUpdaterPtr&);
+
+ virtual Ice::ConnectionObserverPtr
+ getConnectionObserver(const Ice::ConnectionObserverPtr&, const Ice::ConnectionPtr&);
+
+ virtual Ice::ObjectObserverPtr
+ getThreadObserver(const Ice::ObjectObserverPtr&, const std::string&, const std::string&);
+
+ virtual Ice::ThreadPoolThreadObserverPtr
+ getThreadPoolThreadObserver(const Ice::ThreadPoolThreadObserverPtr&, const std::string&, const std::string&);
+
+ virtual Ice::RequestObserverPtr
+ getInvocationObserver(const Ice::RequestObserverPtr&, const Ice::ObjectPrx&, const std::string&);
+
+ virtual Ice::RequestObserverPtr
+ getDispatchObserver(const Ice::RequestObserverPtr&, const Ice::ObjectPtr&, const Ice::Current&);
+
+private:
+
+ const MetricsAdminIPtr _metrics;
+
+ ObjectObserverResolver<ConnectionObserverI> _connections;
+};
+
+}
+
+#endif
diff --git a/slice/Ice/Metrics.ice b/slice/Ice/Metrics.ice
new file mode 100644
index 00000000000..fffc0ad3e43
--- /dev/null
+++ b/slice/Ice/Metrics.ice
@@ -0,0 +1,112 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+module IceMetrics
+{
+
+class MetricsObject
+{
+ string id;
+ int total;
+ int current;
+};
+
+sequence<MetricsObject> MetricsObjectSeq;
+dictionary<string, MetricsObjectSeq> MetricsObjectSeqDict;
+dictionary<string, MetricsObjectSeqDict> MetricsObjectSeqDictDict;
+
+dictionary<string, string> NameValueDict;
+
+exception UnknownMetricsView
+{
+};
+
+interface MetricsAdmin
+{
+ /**
+ *
+ * Get the metrics objects for the given metrics view. This
+ * returns a map of metric maps for each metrics class configured
+ * with the view.
+ *
+ **/
+ MetricsObjectSeqDict getMetrics(string view)
+ throws UnknownMetricsView;
+
+ /**
+ *
+ * Get all the metrics objects for the given metrics class. This
+ * returns a map of metrics view.
+ *
+ **/
+ MetricsObjectSeqDictDict getAllMetrics();
+
+ /**
+ *
+ * Monitor a new metrics class with the given view.
+ *
+ **/
+ void addClassToView(string name, string cl, string groupBy, NameValueDict acceptFilter, NameValueDict rejectFilter);
+
+ /**
+ *
+ * No longer monitor the given metrics class with the given view.
+ *
+ **/
+ void removeClassFromView(string name, string cl)
+ throws UnknownMetricsView;
+};
+
+local interface MetricsViewProvider
+{
+ void addView(string name, string value, NameValueDict accept, NameValueDict reject);
+ void removeView(string name);
+ MetricsObjectSeq getView(string view);
+};
+
+local interface MetricsViewProviderManager
+{
+ void add(string cl, MetricsViewProvider provider);
+ void remove(string cl);
+};
+
+class ThreadPoolThreadMetricsObject extends MetricsObject
+{
+ int inUse;
+ int inUseForIO;
+};
+
+class RequestMetricsObject extends MetricsObject
+{
+ int responseOK;
+ int responseUserException;
+ int responseSytemException;
+ int failure;
+
+ long marshalTime;
+ long unmarshalTime;
+ long time;
+};
+
+class ConnectionMetricsObject extends MetricsObject
+{
+ int initializing;
+ int holding;
+ int active;
+ int closing;
+ int closed;
+
+ long receivedBytes;
+ long receivedTime;
+
+ long sentBytes;
+ long sentTime;
+};
+
+}; \ No newline at end of file
diff --git a/slice/Ice/Observer.ice b/slice/Ice/Observer.ice
new file mode 100644
index 00000000000..76237f35104
--- /dev/null
+++ b/slice/Ice/Observer.ice
@@ -0,0 +1,84 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#pragma once
+
+[["cpp:header-ext:h"]]
+
+#include <Ice/ConnectionF.ice>
+#include <Ice/Current.ice>
+
+module Ice
+{
+
+local interface ObjectObserver
+{
+ void attach();
+ void detach();
+};
+
+enum ThreadState
+{
+ ThreadIddle,
+ ThreadInUse,
+ ThreadInUseForIO,
+};
+
+local interface ThreadPoolThreadObserver extends ObjectObserver
+{
+ void stateChanged(ThreadState oldState, ThreadState newState);
+};
+
+local interface RequestObserver extends ObjectObserver
+{
+ void reponseOK();
+ void reponseUserException();
+ void reponseSystemException();
+ void failure();
+ void marshalTime(long time);
+ void unmarshalTime(long time);
+ void callTime(long time);
+};
+
+enum ConnectionState
+{
+ ConnectionStateInitializing,
+ ConnectionStateHolding,
+ ConnectionStateActive,
+ ConnectionStateClosing,
+ ConnectionStateClosed
+};
+
+local interface ConnectionObserver extends ObjectObserver
+{
+ void stateChanged(ConnectionState oldState, ConnectionState newState);
+ void sentBytes(int num, long duration);
+ void receivedBytes(int num, long duration);
+};
+
+local interface ObserverUpdater
+{
+ void updateConnectionObservers();
+ void updateThreadObservers();
+ void updateThreadPoolThreadObservers();
+};
+
+local interface ObserverResolver
+{
+ void setObserverUpdater(ObserverUpdater refresher);
+
+ ConnectionObserver getConnectionObserver(ConnectionObserver old, Connection con);
+ ObjectObserver getThreadObserver(ObjectObserver old, string parent, string id);
+ ThreadPoolThreadObserver getThreadPoolThreadObserver(ThreadPoolThreadObserver old, string parent, string id);
+ RequestObserver getInvocationObserver(RequestObserver old, Object* prx, string operation);
+ RequestObserver getDispatchObserver(RequestObserver old, Object obj, Current c);
+};
+
+};
+
diff --git a/slice/Ice/ObserverF.ice b/slice/Ice/ObserverF.ice
new file mode 100644
index 00000000000..5118ffcbaf1
--- /dev/null
+++ b/slice/Ice/ObserverF.ice
@@ -0,0 +1,25 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2012 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#pragma once
+
+[["cpp:header-ext:h"]]
+
+module Ice
+{
+
+local interface ObjectObserver;
+local interface ThreadPoolThreadObserver;
+local interface Requesttats;
+local interface ConnectionObserver;
+
+local interface ObserverResolver;
+
+};
+
diff --git a/slice/Ice/Stats.ice b/slice/Ice/Stats.ice
index 072fd5c6bd2..aa33879c66c 100644
--- a/slice/Ice/Stats.ice
+++ b/slice/Ice/Stats.ice
@@ -47,6 +47,6 @@ local interface Stats
**/
void bytesReceived(string protocol, int num);
};
-
+
};