diff options
author | Benoit Foucher <benoit@zeroc.com> | 2012-08-28 14:45:45 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2012-08-28 14:45:45 +0200 |
commit | 55210695a994c710a1fff1d627b5e08ec96229a5 (patch) | |
tree | db3729d0c09f92a7a62eaaa41870f507249940b7 /cpp | |
parent | Added IceGridGUI metrics reports (diff) | |
download | ice-55210695a994c710a1fff1d627b5e08ec96229a5.tar.bz2 ice-55210695a994c710a1fff1d627b5e08ec96229a5.tar.xz ice-55210695a994c710a1fff1d627b5e08ec96229a5.zip |
Simplifed Connection metrics, re-factoring
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 168 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 19 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.cpp | 9 | ||||
-rw-r--r-- | cpp/src/Ice/MetricsAdminI.cpp | 92 | ||||
-rw-r--r-- | cpp/src/Ice/MetricsAdminI.h | 101 | ||||
-rw-r--r-- | cpp/src/Ice/MetricsObserverI.h | 83 | ||||
-rw-r--r-- | cpp/src/Ice/ObserverI.cpp | 56 | ||||
-rw-r--r-- | cpp/src/Ice/ObserverI.h | 4 | ||||
-rwxr-xr-x | cpp/test/Ice/custom/AllTests.cpp | 9 |
9 files changed, 274 insertions, 267 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index fb38a16bc8a..bd7bb4ba8a3 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -137,9 +137,9 @@ IceInternal::ConnectionReaper::add(const ConnectionIPtr& connection) { Lock sync(*this); _connections.push_back(connection); - if(connection->_observer.get()) + if(connection->_observer) { - connection->_observer.reset(0); + connection->_observer.detach(); } } @@ -150,47 +150,44 @@ IceInternal::ConnectionReaper::swapConnections(vector<ConnectionIPtr>& connectio _connections.swap(connections); } -Ice::ConnectionI::Observer::Observer(const BasicStream& readStream, const BasicStream& writeStream) : - _readStream(readStream), _writeStream(writeStream) +Ice::ConnectionI::Observer::Observer() : _readStreamPos(0), _writeStreamPos(0) { } void -Ice::ConnectionI::Observer::startRead() +Ice::ConnectionI::Observer::startRead(Ice::Byte* i) { - if(_readWatch.isStarted()) + if(_readStreamPos) { - assert(_readStream.i >= _readStreamPos); - _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); + _observer->receivedBytes(static_cast<int>(i - _readStreamPos)); } - _readStreamPos = _readStream.i; - _readWatch.start(); + _readStreamPos = i; } void -Ice::ConnectionI::Observer::finishRead() +Ice::ConnectionI::Observer::finishRead(Ice::Byte* i) { - assert(_readStream.i >= _readStreamPos); - _observer->receivedBytes(static_cast<int>(_readStream.i - _readStreamPos), _readWatch.stop()); + assert(i >= _readStreamPos); + _observer->receivedBytes(static_cast<int>(i - _readStreamPos)); + _readStreamPos = 0; } void -Ice::ConnectionI::Observer::startWrite() +Ice::ConnectionI::Observer::startWrite(Ice::Byte* i) { - if(_writeWatch.isStarted()) + if(_writeStreamPos) { - assert(_writeStream.i >= _writeStreamPos); - _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + _observer->sentBytes(static_cast<int>(i - _writeStreamPos)); } - _writeStreamPos = _writeStream.i; - _writeWatch.start(); + _writeStreamPos = i; } void -Ice::ConnectionI::Observer::finishWrite() +Ice::ConnectionI::Observer::finishWrite(Ice::Byte* i) { - assert(_writeStream.i >= _writeStreamPos); - _observer->sentBytes(static_cast<int>(_writeStream.i - _writeStreamPos), _writeWatch.stop()); + assert(i >= _writeStreamPos); + _observer->sentBytes(static_cast<int>(i - _writeStreamPos)); + _writeStreamPos = 0; } void @@ -515,22 +512,10 @@ Ice::ConnectionI::updateObserver() const CommunicatorObserverPtr& comObsv = _instance->initializationData().observer; assert(comObsv); - ConnectionObserverPtr obsv = comObsv->getConnectionObserver(info, - _endpoint->getInfo(), - connectionStateMap[static_cast<int>(_state)], - _observer.get() ? _observer->get() : 0); - if(obsv) - { - if(!_observer.get()) - { - _observer.reset(new Observer(_readStream, _writeStream)); - } - _observer->attach(obsv); - } - else - { - _observer.reset(0); - } + _observer.attach(comObsv->getConnectionObserver(info, + _endpoint->getInfo(), + connectionStateMap[static_cast<int>(_state)], + _observer.get())); } void @@ -1243,9 +1228,9 @@ Ice::ConnectionI::startAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { - if(_observer.get()) + if(_observer) { - _observer->startWrite(); + _observer.startWrite(_writeStream.i); } if(_transceiver->startWrite(_writeStream) && !_sendStreams.empty()) @@ -1256,9 +1241,9 @@ Ice::ConnectionI::startAsync(SocketOperation operation) } else if(operation & SocketOperationRead) { - if(_observer.get() && !_readHeader) + if(_observer && !_readHeader) { - _observer->startRead(); + _observer.startRead(_readStream.i); } _transceiver->startRead(_readStream); @@ -1279,17 +1264,17 @@ Ice::ConnectionI::finishAsync(SocketOperation operation) { if(operation & SocketOperationWrite) { - if(_observer.get()) + if(_observer) { - _observer->finishWrite(); + _observer.finishWrite(_writeStream.i); } _transceiver->finishWrite(_writeStream); } else if(operation & SocketOperationRead) { - if(_observer.get() && !_readHeader) + if(_observer && !_readHeader) { - _observer->finishRead(); + _observer.finishRead(_readStream.i); } _transceiver->finishRead(_readStream); } @@ -1337,9 +1322,9 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) { if(_writeStream.i != _writeStream.b.end()) { - if(_observer.get()) + if(_observer) { - _observer->startWrite(); + _observer.startWrite(_writeStream.i); } if(!_transceiver->write(_writeStream)) @@ -1349,9 +1334,9 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) return; } - if(_observer.get()) + if(_observer) { - _observer->finishWrite(); + _observer.finishWrite(_writeStream.i); } } assert(_writeStream.i == _writeStream.b.end()); @@ -1367,13 +1352,9 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) assert(_readStream.i == _readStream.b.end()); _readHeader = false; - if(_observer.get()) + if(_observer) { - // - // We can't measure the time to receive the header as it would - // include the wait time. - // - (*_observer)->receivedBytes(static_cast<int>(headerSize), 0); + _observer->receivedBytes(static_cast<int>(headerSize)); } ptrdiff_t pos = _readStream.i - _readStream.b.begin(); @@ -1430,9 +1411,9 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } else { - if(_observer.get()) + if(_observer) { - _observer->startRead(); + _observer.startRead(_readStream.i); } if(!_transceiver->read(_readStream)) @@ -1442,9 +1423,9 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) return; } - if(_observer.get()) + if(_observer) { - _observer->finishRead(); + _observer.finishRead(_readStream.i); } assert(_readStream.i == _readStream.b.end()); } @@ -2192,19 +2173,24 @@ Ice::ConnectionI::setState(State state) } } - if(_observer.get()) + if(_observer) { ConnectionState oldState = connectionStateMap[static_cast<int>(_state)]; ConnectionState newState = connectionStateMap[static_cast<int>(state)]; if(oldState != newState) { - (*_observer)->stateChanged(oldState, newState); + _observer->stateChanged(oldState, newState); } if(state == StateClosed && _exception.get()) { - if(!dynamic_cast<CloseConnectionException*>(_exception.get())) + if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || + dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || + dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || + dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || + (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing))) { - (*_observer)->failed(_exception->ice_name()); + _observer->failed(_exception->ice_name()); } } } @@ -2292,15 +2278,10 @@ Ice::ConnectionI::initialize(SocketOperation operation) _info->incoming = _connector == 0; _info->adapterName = _adapter ? _adapter->getName() : string(); - ConnectionObserverPtr obsv = comObsv->getConnectionObserver(_info, - _endpoint->getInfo(), - ConnectionStateValidating, - 0); - if(obsv) - { - _observer.reset(new Observer(_readStream, _writeStream)); - _observer->attach(obsv); - } + _observer.attach(comObsv->getConnectionObserver(_info, + _endpoint->getInfo(), + ConnectionStateValidating, + 0)); } // @@ -2333,9 +2314,9 @@ Ice::ConnectionI::validate(SocketOperation operation) traceSend(_writeStream, _logger, _traceLevels); } - if(_observer.get()) + if(_observer) { - _observer->startWrite(); + _observer.startWrite(_writeStream.i); } if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) @@ -2345,9 +2326,9 @@ Ice::ConnectionI::validate(SocketOperation operation) return false; } - if(_observer.get()) + if(_observer) { - _observer->finishWrite(); + _observer.finishWrite(_writeStream.i); } } else // The client side has the passive role for connection validation. @@ -2358,9 +2339,9 @@ Ice::ConnectionI::validate(SocketOperation operation) _readStream.i = _readStream.b.begin(); } - if(_observer.get()) + if(_observer) { - _observer->startRead(); + _observer.startRead(_readStream.i); } if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream)) @@ -2370,9 +2351,9 @@ Ice::ConnectionI::validate(SocketOperation operation) return false; } - if(_observer.get()) + if(_observer) { - _observer->finishRead(); + _observer.finishRead(_readStream.i); } assert(_readStream.i == _readStream.b.end()); @@ -2528,9 +2509,9 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb // // Send the message. // - if(_observer.get()) + if(_observer) { - _observer->startWrite(); + _observer.startWrite(_writeStream.i); } assert(_writeStream.i); if(_writeStream.i != _writeStream.b.end() && !_transceiver->write(_writeStream)) @@ -2539,9 +2520,9 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb scheduleTimeout(SocketOperationWrite, _endpoint->timeout()); return; } - if(_observer.get()) + if(_observer) { - _observer->finishWrite(); + _observer.finishWrite(_writeStream.i); } } } @@ -2611,8 +2592,17 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // // Send the message without blocking. // + if(_observer) + { + _observer.startWrite(stream.i); + } if(_transceiver->write(stream)) { + if(_observer) + { + _observer.finishWrite(stream.i); + } + AsyncStatus status = AsyncStatusSent; if(message.sent(this, false)) { @@ -2664,8 +2654,16 @@ Ice::ConnectionI::sendMessage(OutgoingMessage& message) // // Send the message without blocking. // + if(_observer) + { + _observer.startWrite(message.stream->i); + } if(_transceiver->write(*message.stream)) { + if(_observer) + { + _observer.finishWrite(message.stream->i); + } AsyncStatus status = AsyncStatusSent; if(message.sent(this, false)) { diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 57fd34d3e5f..1ede520d520 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -69,22 +69,17 @@ class ICE_API ConnectionI : public Connection, public IceInternal::EventHandler, { public: - Observer(const IceInternal::BasicStream&, const IceInternal::BasicStream&); + Observer(); - void startRead(); - void finishRead(); - void startWrite(); - void finishWrite(); + void startRead(Ice::Byte*); + void finishRead(Ice::Byte*); + void startWrite(Ice::Byte*); + void finishWrite(Ice::Byte*); private: - Ice::Byte* _writeStreamPos; - IceUtilInternal::StopWatch _writeWatch; Ice::Byte* _readStreamPos; - IceUtilInternal::StopWatch _readWatch; - - const IceInternal::BasicStream& _readStream; - const IceInternal::BasicStream& _writeStream; + Ice::Byte* _writeStreamPos; }; public: @@ -363,7 +358,7 @@ private: bool _readHeader; IceInternal::BasicStream _writeStream; - std::auto_ptr<Observer> _observer; + Observer _observer; int _dispatchCount; diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index d4c83e6f980..5dda22e333c 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -1076,15 +1076,16 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Initi _adminFacets.insert(FacetMap::value_type("Properties", new PropertiesAdminI(_initData.properties))); _adminFacets.insert(FacetMap::value_type("Process", new ProcessI(communicator))); + IceMX::MetricsAdminIPtr admin = new IceMX::MetricsAdminI(_initData.properties); + _adminFacets.insert(FacetMap::value_type("MetricsAdmin", admin)); // - // Register the metrics admin plugin only if the user didn't already set an + // Setup the communicator observer only the metrics admin plugin only if the user didn't already set an // Ice observer resovler. // - if(!_initData.observer) + if(!_initData.observer && + (_adminFacetFilter.empty() || _adminFacetFilter.find("MetricsAdmin") != _adminFacetFilter.end())) { - IceMX::MetricsAdminIPtr admin = new IceMX::MetricsAdminI(_initData.properties); - _adminFacets.insert(FacetMap::value_type("MetricsAdmin", admin)); _initData.observer = new IceMX::CommunicatorObserverI(admin); } diff --git a/cpp/src/Ice/MetricsAdminI.cpp b/cpp/src/Ice/MetricsAdminI.cpp index c300a169cf3..b7b5a397ea2 100644 --- a/cpp/src/Ice/MetricsAdminI.cpp +++ b/cpp/src/Ice/MetricsAdminI.cpp @@ -72,6 +72,7 @@ MetricsMapI::RegExp::match(const MetricsHelper& helper) } MetricsMapI::MetricsMapI(const std::string& mapPrefix, const Ice::PropertiesPtr& properties) : + _properties(properties->getPropertiesForPrefix(mapPrefix)), _retain(properties->getPropertyAsIntWithDefault(mapPrefix + ".RetainDetached", 10)), _accept(parseRule(properties, mapPrefix + ".Accept")), _reject(parseRule(properties, mapPrefix + ".Reject")) @@ -205,7 +206,7 @@ MetricsMapI::getMatching(const MetricsHelper& helper) map<string, EntryPtr>::const_iterator p = _objects.find(key); if(p == _objects.end()) { - p = _objects.insert(make_pair(key, newEntry(helper.newMetrics(key)))).first; + p = _objects.insert(make_pair(key, newEntry(key))).first; } return p->second; } @@ -249,20 +250,52 @@ MetricsMapI::detached(Entry* entry) _detachedQueue.push_back(entry); } -MetricsViewI::MetricsViewI() +MetricsViewI::MetricsViewI(const string& name) : _name(name) { } void -MetricsViewI::add(const string& name, const MetricsMapIPtr& map) +MetricsViewI::update(const Ice::PropertiesPtr& properties, + const map<string, MetricsMapFactoryPtr>& factories, + set<string>& updatedMaps) { - _maps.insert(make_pair(name, map)); -} + // + // Add maps to views configured with the given map. + // + const string viewPrefix = "IceMX.Metrics." + _name; + const string mapsPrefix = viewPrefix + ".Map."; + bool hasMapsProperties = !properties->getPropertiesForPrefix(mapsPrefix).empty(); + for(map<string, MetricsMapFactoryPtr>::const_iterator p = factories.begin(); p != factories.end(); ++p) + { + const string& mapName = p->first; + string mapPrefix; + PropertyDict mapProps; + if(hasMapsProperties) + { + mapPrefix = mapsPrefix + mapName; + mapProps = properties->getPropertiesForPrefix(mapPrefix); + if(mapProps.empty()) + { + // This map isn't configured for this view. + updatedMaps.insert(mapName); + _maps.erase(mapName); + continue; + } + } + else + { + mapPrefix = viewPrefix; + mapProps = properties->getPropertiesForPrefix(mapPrefix); + } -void -MetricsViewI::remove(const string& cl) -{ - _maps.erase(cl); + map<string, MetricsMapIPtr>::iterator q = _maps.find(mapName); + if(q != _maps.end() && q->second->getMapProperties() == mapProps) + { + continue; // The map configuration didn't change. + } + _maps.insert(make_pair(mapName, p->second->create(mapPrefix, properties))); + updatedMaps.insert(mapName); + } } MetricsView @@ -299,9 +332,9 @@ MetricsViewI::getFailures(const string& mapName, const string& id) } MetricsMapI::EntryPtr -MetricsViewI::getMatching(const MetricsHelper& helper) const +MetricsViewI::getMatching(const string& mapName, const MetricsHelper& helper) const { - map<string, MetricsMapIPtr>::const_iterator p = _maps.find(helper.getMapName()); + map<string, MetricsMapIPtr>::const_iterator p = _maps.find(mapName); if(p != _maps.end()) { return p->second->getMatching(helper); @@ -331,15 +364,14 @@ MetricsAdminI::addUpdater(const string& mapName, const UpdaterPtr& updater) } void -MetricsAdminI::addFactory(const string& mapName, const MetricsMapFactoryPtr& factory) +MetricsAdminI::updateViews() { - _factories[mapName] = factory; - // // Add maps to views configured with the given map. // const string viewsPrefix = "IceMX.Metrics."; PropertyDict views = _properties->getPropertiesForPrefix(viewsPrefix); + set<string> updatedMaps; for(PropertyDict::const_iterator p = views.begin(); p != views.end(); ++p) { string viewName = p->first.substr(viewsPrefix.size()); @@ -351,41 +383,37 @@ MetricsAdminI::addFactory(const string& mapName, const MetricsMapFactoryPtr& fac if(_properties->getPropertyAsIntWithDefault(viewsPrefix + viewName + ".Disabled", 0) > 0) { - continue; // The view is disabled + // The view is disabled + _views.erase(viewName); + continue; } map<string, MetricsViewIPtr>::const_iterator q = _views.find(viewName); if(q == _views.end()) { - q = _views.insert(make_pair(viewName, new MetricsViewI())).first; + q = _views.insert(make_pair(viewName, new MetricsViewI(viewName))).first; } - MetricsViewIPtr view = q->second; - - const string mapsPrefix = viewsPrefix + viewName + ".Map."; - string mapPrefix = mapsPrefix + mapName; - if(_properties->getPropertiesForPrefix(mapPrefix).empty()) + q->second->update(_properties, _factories, updatedMaps); + } + + for(set<string>::const_iterator p = updatedMaps.begin(); p != updatedMaps.end(); ++p) + { + map<string, UpdaterPtr>::const_iterator q = _updaters.find(*p); + if(q != _updaters.end()) { - if(_properties->getPropertiesForPrefix(mapsPrefix).empty()) - { - mapPrefix = viewsPrefix + viewName; - } - else - { - continue; // This map isn't configured for this view. - } + q->second->update(); } - view->add(mapName, factory->create(mapPrefix, _properties)); } } vector<MetricsMapI::EntryPtr> -MetricsAdminI::getMatching(const MetricsHelper& helper) const +MetricsAdminI::getMatching(const string& mapName, const MetricsHelper& helper) const { Lock sync(*this); vector<MetricsMapI::EntryPtr> objects; for(map<string, MetricsViewIPtr>::const_iterator p = _views.begin(); p != _views.end(); ++p) { - MetricsMapI::EntryPtr e = p->second->getMatching(helper); + MetricsMapI::EntryPtr e = p->second->getMatching(mapName, helper); if(e) { objects.push_back(e); diff --git a/cpp/src/Ice/MetricsAdminI.h b/cpp/src/Ice/MetricsAdminI.h index b1984dadbb6..d59015463e5 100644 --- a/cpp/src/Ice/MetricsAdminI.h +++ b/cpp/src/Ice/MetricsAdminI.h @@ -121,7 +121,6 @@ public: clone() const { IceUtil::Mutex::Lock sync(*this); - // TODO: Fix ice_clone to use a co-variant type. return dynamic_cast<Metrics*>(_object->ice_clone().get()); } @@ -163,18 +162,23 @@ public: MetricsMap getMetrics() const; EntryPtr getMatching(const MetricsHelper&); -protected: - - virtual EntryPtr newEntry(const MetricsPtr& object) + const Ice::PropertyDict& getMapProperties() const { - return new Entry(this, object); + return _properties; } + virtual MetricsMapI* clone() const = 0; + +protected: + + virtual EntryPtr newEntry(const std::string&) = 0; + private: friend class Entry; void detached(Entry*); - + + const Ice::PropertyDict _properties; std::vector<std::string> _groupByAttributes; std::vector<std::string> _groupBySeparators; const int _retain; @@ -201,7 +205,7 @@ public: typedef MetricsType T; typedef IceInternal::Handle<MetricsType> TPtr; - typedef MetricsMap MetricsType::*SubMapMember; + typedef MetricsMap MetricsType::* SubMapMember; class EntryT : public MetricsMapI::Entry { @@ -251,12 +255,13 @@ public: }; typedef IceUtil::Handle<EntryT> EntryTPtr; - MetricsMapT(const std::string& mapPrefix, + MetricsMapT(const std::string& mapPrefix, const Ice::PropertiesPtr& properties, - const std::map<std::string, SubMapMember>& subMaps) : + const std::map<std::string, std::pair<SubMapMember, MetricsMapFactoryPtr> >& subMaps) : MetricsMapI(mapPrefix, properties) { - for(typename std::map<std::string, SubMapMember>::const_iterator p = subMaps.begin(); p != subMaps.end(); ++p) + typename std::map<std::string, std::pair<SubMapMember, MetricsMapFactoryPtr> >::const_iterator p; + for(p = subMaps.begin(); p != subMaps.end(); ++p) { const std::string subMapsPrefix = mapPrefix + ".Map."; std::string subMapPrefix = subMapsPrefix + p->first; @@ -272,51 +277,87 @@ public: } } _subMaps.insert(std::make_pair(p->first, - std::make_pair(new MetricsMapI(subMapPrefix, properties), p->second))); + std::make_pair(p->second.first, + p->second.second->create(subMapPrefix, properties)))); } } + MetricsMapT(const MetricsMapT& other) : MetricsMapI(other) + { + } + std::pair<MetricsMapIPtr, SubMapMember> createSubMap(const std::string& subMapName) { - typename std::map<std::string, std::pair<MetricsMapIPtr, SubMapMember> >::const_iterator p = + typename std::map<std::string, std::pair<SubMapMember, MetricsMapIPtr> >::const_iterator p = _subMaps.find(subMapName); if(p != _subMaps.end()) { - return std::make_pair(new MetricsMapI(*p->second.first), p->second.second); + return std::make_pair(p->second.second->clone(), p->second.first); } return std::make_pair(MetricsMapIPtr(), static_cast<SubMapMember>(0)); } protected: - virtual EntryPtr newEntry(const MetricsPtr& object) + virtual EntryPtr newEntry(const std::string& id) + { + TPtr t = new T(); + t->id = id; + t->failures = 0; + return new EntryT(this, t); + } + + virtual MetricsMapI* clone() const + { + return new MetricsMapT<MetricsType>(*this); + } + + std::map<std::string, std::pair<SubMapMember, MetricsMapIPtr> > _subMaps; +}; + +template<class MetricsType> class MetricsMapFactoryT : public MetricsMapFactory +{ +public: + + virtual MetricsMapIPtr + create(const std::string& mapPrefix, const Ice::PropertiesPtr& properties) { - return new EntryT(this, TPtr::dynamicCast(object)); + return new MetricsMapT<MetricsType>(mapPrefix, properties, _subMaps); } - std::map<std::string, std::pair<MetricsMapIPtr, SubMapMember> > _subMaps; + template<class SubMapMetricsType> void + registerSubMap(const std::string& subMap, MetricsMap MetricsType::* member) + { + _subMaps[subMap] = make_pair(member, new MetricsMapFactoryT<SubMapMetricsType>()); + } + +private: + + std::map<std::string, std::pair<MetricsMap MetricsType::*, MetricsMapFactoryPtr> > _subMaps; }; class MetricsViewI : public IceUtil::Shared { public: - MetricsViewI(); + MetricsViewI(const std::string&); - void add(const std::string&, const MetricsMapIPtr&); - void remove(const std::string&); + void update(const Ice::PropertiesPtr&, + const std::map<std::string, MetricsMapFactoryPtr>&, + std::set<std::string>&); MetricsView getMetrics(); MetricsFailuresSeq getFailures(const std::string&); MetricsFailures getFailures(const std::string&, const std::string&); - MetricsMapI::EntryPtr getMatching(const MetricsHelper&) const; + MetricsMapI::EntryPtr getMatching(const std::string&, const MetricsHelper&) const; std::vector<std::string> getMaps() const; private: + const std::string _name; std::map<std::string, MetricsMapIPtr> _maps; }; typedef IceUtil::Handle<MetricsViewI> MetricsViewIPtr; @@ -327,10 +368,26 @@ public: MetricsAdminI(const ::Ice::PropertiesPtr&); - std::vector<MetricsMapI::EntryPtr> getMatching(const MetricsHelper&) const; + std::vector<MetricsMapI::EntryPtr> getMatching(const std::string&, const MetricsHelper&) const; void addUpdater(const std::string&, const UpdaterPtr&); - void addFactory(const std::string&, const MetricsMapFactoryPtr&); + void updateViews(); + + template<class MetricsType> void + registerMap(const std::string& map) + { + _factories[map] = new MetricsMapFactoryT<MetricsType>(); + } + + template<class MemberMetricsType, class MetricsType> void + registerSubMap(const std::string& map, const std::string& subMap, MetricsMap MetricsType::* member) + { + std::map<std::string, MetricsMapFactoryPtr>::const_iterator p = _factories.find(map); + assert(p != _factories.end()); + + MetricsMapFactoryT<MetricsType>* factory = dynamic_cast<MetricsMapFactoryT<MetricsType>*>(p->second.get()); + factory->template registerSubMap<MemberMetricsType>(subMap, member); + } virtual Ice::StringSeq getMetricsViewNames(const ::Ice::Current&); virtual MetricsView getMetricsView(const std::string&, const ::Ice::Current&); diff --git a/cpp/src/Ice/MetricsObserverI.h b/cpp/src/Ice/MetricsObserverI.h index 3b0345c22ed..39970d678e9 100644 --- a/cpp/src/Ice/MetricsObserverI.h +++ b/cpp/src/Ice/MetricsObserverI.h @@ -26,25 +26,6 @@ class MetricsHelper public: virtual std::string operator()(const std::string&) const = 0; - - virtual MetricsPtr newMetrics(const std::string&) const = 0; - - const std::string& - getMapName() const - { - return _name; - } - -protected: - - MetricsHelper(const std::string& name, const std::string& subName) : _name(name), _subName(subName) - { - } - -private: - - std::string _name; - std::string _subName; }; class Updater : public IceUtil::Shared @@ -59,14 +40,6 @@ template<typename T> class MetricsHelperT : public MetricsHelper { public: - virtual MetricsPtr newMetrics(const std::string& id) const - { - MetricsPtr t = new T(); - t->id = id; - t->failures = 0; - return t; - } - virtual void initMetrics(const IceInternal::Handle<T>&) const { // To be overriden in specialization to initialize state attributes @@ -74,10 +47,6 @@ public: protected: - MetricsHelperT(const std::string& name, const std::string& subName = std::string()) : MetricsHelper(name, subName) - { - } - template<typename Helper> class AttributeResolverT { class Resolver @@ -349,12 +318,12 @@ public: } template<typename ObserverImpl, typename ObserverMetricsType> IceInternal::Handle<ObserverImpl> - getObserver(const MetricsHelperT<ObserverMetricsType>& helper) + getObserver(const std::string& mapName, const MetricsHelperT<ObserverMetricsType>& helper) { std::vector<MetricsMapI::EntryPtr> metricsObjects; for(typename SeqType::const_iterator p = _objects.begin(); p != _objects.end(); ++p) { - MetricsMapI::EntryPtr e = p->second->getMatching(helper.getMapName(), helper); + MetricsMapI::EntryPtr e = p->second->getMatching(mapName, helper); if(e) { metricsObjects.push_back(e); @@ -396,14 +365,15 @@ public: typedef IceUtil::Handle<ObserverImplType> ObserverImplPtrType; typedef typename ObserverImplType::Type MetricsType; - ObserverFactoryT(const MetricsAdminIPtr& metrics) : _metrics(metrics) + ObserverFactoryT(const MetricsAdminIPtr& metrics, const std::string& name) : _metrics(metrics), _name(name) { + _metrics->registerMap<MetricsType>(name); } ObserverImplPtrType getObserver(const MetricsHelperT<MetricsType>& helper) { - std::vector<MetricsMapI::EntryPtr> metricsObjects = _metrics->getMatching(helper); + std::vector<MetricsMapI::EntryPtr> metricsObjects = _metrics->getMatching(_name, helper); if(metricsObjects.empty()) { return 0; @@ -417,7 +387,7 @@ public: template<typename ObserverPtrType> ObserverImplPtrType getObserver(const MetricsHelperT<MetricsType>& helper, const ObserverPtrType& observer) { - std::vector<MetricsMapI::EntryPtr> metricsObjects = _metrics->getMatching(helper); + std::vector<MetricsMapI::EntryPtr> metricsObjects = _metrics->getMatching(_name, helper); if(metricsObjects.empty()) { return 0; @@ -432,49 +402,16 @@ public: return obsv; } - virtual MetricsMapFactoryPtr - newFactory() - { - class Factory : public MetricsMapFactory - { - public: - - virtual MetricsMapIPtr - create(const std::string& mapPrefix, const Ice::PropertiesPtr& properties) - { - return new MetricsMapI(mapPrefix, properties); - } - }; - return new Factory(); - } - - virtual MetricsMapFactoryPtr - newFactory(const std::map<std::string, MetricsMap MetricsType::*>& subMaps) + void + registerSubMap(const std::string& subMap, MetricsMap MetricsType::* member) { - class Factory : public MetricsMapFactory - { - public: - Factory(std::map<std::string, MetricsMap MetricsType::*> subMaps) : _subMaps(subMaps) - { - } - - virtual MetricsMapIPtr - create(const std::string& mapPrefix, const Ice::PropertiesPtr& properties) - { - return new MetricsMapT<MetricsType>(mapPrefix, properties, _subMaps); - } - - private: - - std::map<std::string, MetricsMap MetricsType::*> _subMaps; - }; - return new Factory(subMaps); + _metrics->registerSubMap<MetricsType>(_name, subMap, member); } private: - const std::string _name; const MetricsAdminIPtr _metrics; + const std::string _name; }; } diff --git a/cpp/src/Ice/ObserverI.cpp b/cpp/src/Ice/ObserverI.cpp index 57122288a18..ac954d8df25 100644 --- a/cpp/src/Ice/ObserverI.cpp +++ b/cpp/src/Ice/ObserverI.cpp @@ -154,7 +154,7 @@ public: static Attributes attributes; ConnectionHelper(const ConnectionInfoPtr& con, const EndpointInfoPtr& endpt, ConnectionState state) : - MetricsHelperT<ConnectionMetrics>("Connection"), _connection(con), _endpoint(endpt), _state(state) + _connection(con), _endpoint(endpt), _state(state) { } @@ -243,7 +243,7 @@ public: }; static Attributes attributes; - DispatchHelper(const Current& current) : MetricsHelperT<Metrics>("Dispatch"), _current(current) + DispatchHelper(const Current& current) : _current(current) { } @@ -344,7 +344,7 @@ public: static Attributes attributes; InvocationHelper(const Ice::ObjectPrx& proxy, const string& op, const Ice::Context& ctx = Ice::Context()) : - MetricsHelperT<InvocationMetrics>("Invocation"), _proxy(proxy), _operation(op), _context(ctx) + _proxy(proxy), _operation(op), _context(ctx) { } @@ -447,7 +447,7 @@ public: }; static Attributes attributes; - RemoteInvocationHelper(const ConnectionPtr& con) : MetricsHelperT<Metrics>("Remote"), _connection(con) + RemoteInvocationHelper(const ConnectionPtr& con) : _connection(con) { } @@ -520,8 +520,7 @@ public: }; static Attributes attributes; - ThreadHelper(const string& parent, const string& id, ThreadState state) : - MetricsHelperT<ThreadMetrics>("Thread"), _parent(parent), _id(id), _state(state) + ThreadHelper(const string& parent, const string& id, ThreadState state) : _parent(parent), _id(id), _state(state) { } @@ -564,9 +563,7 @@ public: }; static Attributes attributes; - EndpointHelper(const string& mapName, - const EndpointInfoPtr& endpt, - const string& id) : MetricsHelperT<Metrics>(mapName), _id(id), _endpoint(endpt) + EndpointHelper(const EndpointInfoPtr& endpt, const string& id) : _id(id), _endpoint(endpt) { } @@ -611,15 +608,15 @@ ConnectionObserverI::stateChanged(ConnectionState oldState, ConnectionState newS } void -ConnectionObserverI::sentBytes(Int num, Long duration) +ConnectionObserverI::sentBytes(Int num) { - forEach(chain(add(&ConnectionMetrics::sentBytes, num), add(&ConnectionMetrics::sentTime, duration))); + forEach(add(&ConnectionMetrics::sentBytes, num)); } void -ConnectionObserverI::receivedBytes(Int num, Long duration) +ConnectionObserverI::receivedBytes(Int num) { - forEach(chain(add(&ConnectionMetrics::receivedBytes, num), add(&ConnectionMetrics::receivedTime, duration))); + forEach(add(&ConnectionMetrics::receivedBytes, num)); } void @@ -641,27 +638,20 @@ InvocationObserverI::retried() ObserverPtr InvocationObserverI::getRemoteObserver(const Ice::ConnectionPtr& connection) { - return getObserver<ObserverI>(RemoteInvocationHelper(connection)); + return getObserver<ObserverI>("Remote", RemoteInvocationHelper(connection)); } CommunicatorObserverI::CommunicatorObserverI(const MetricsAdminIPtr& metrics) : _metrics(metrics), - _connections(metrics), - _dispatch(metrics), - _invocations(metrics), - _threads(metrics), - _connects(metrics), - _endpointLookups(metrics) + _connections(metrics, "Connection"), + _dispatch(metrics, "Dispatch"), + _invocations(metrics, "Invocation"), + _threads(metrics, "Thread"), + _connects(metrics, "ConnectionEstablishment"), + _endpointLookups(metrics, "EndpointLookup") { - metrics->addFactory("Connection", _connections.newFactory()); - metrics->addFactory("Thread", _threads.newFactory()); - metrics->addFactory("Dispatch", _dispatch.newFactory()); - metrics->addFactory("ConnectionEstablishment", _connects.newFactory()); - metrics->addFactory("EndpointLookup", _endpointLookups.newFactory()); - - map<string, MetricsMap InvocationMetrics::*> subMaps; - subMaps["Remote"] = &InvocationMetrics::remotes; - metrics->addFactory("Invocation", _invocations.newFactory(subMaps)); + _invocations.registerSubMap("Remote", &InvocationMetrics::remotes); + _metrics->updateViews(); } void @@ -674,13 +664,13 @@ CommunicatorObserverI::setObserverUpdater(const ObserverUpdaterPtr& updater) ObserverPtr CommunicatorObserverI::getConnectionEstablishmentObserver(const Ice::EndpointInfoPtr& endpt, const string& connector) { - return _connects.getObserver(EndpointHelper("ConnectionEstablishment", endpt, connector)); + return _connects.getObserver(EndpointHelper(endpt, connector)); } ObserverPtr CommunicatorObserverI::getEndpointLookupObserver(const Ice::EndpointInfoPtr& endpt, const string& endpoint) { - return _endpointLookups.getObserver(EndpointHelper("EndpointLookup", endpt, endpoint)); + return _endpointLookups.getObserver(EndpointHelper(endpt, endpoint)); } ConnectionObserverPtr @@ -708,9 +698,7 @@ CommunicatorObserverI::getInvocationObserver(const ObjectPrx& proxy, const strin } InvocationObserverPtr -CommunicatorObserverI::getInvocationObserverWithContext(const ObjectPrx& proxy, - const string& op, - const Context& ctx) +CommunicatorObserverI::getInvocationObserverWithContext(const ObjectPrx& proxy, const string& op, const Context& ctx) { return _invocations.getObserver(InvocationHelper(proxy, op, ctx)); } diff --git a/cpp/src/Ice/ObserverI.h b/cpp/src/Ice/ObserverI.h index 211954987eb..29fa7feef68 100644 --- a/cpp/src/Ice/ObserverI.h +++ b/cpp/src/Ice/ObserverI.h @@ -22,8 +22,8 @@ public: virtual void detach(); virtual void stateChanged(Ice::Instrumentation::ConnectionState, Ice::Instrumentation::ConnectionState); - virtual void sentBytes(Ice::Int, Ice::Long); - virtual void receivedBytes(Ice::Int, Ice::Long); + virtual void sentBytes(Ice::Int); + virtual void receivedBytes(Ice::Int); }; class ThreadObserverI : public Ice::Instrumentation::ThreadObserver, public ObserverT<ThreadMetrics> diff --git a/cpp/test/Ice/custom/AllTests.cpp b/cpp/test/Ice/custom/AllTests.cpp index 0f6853e19a5..c916024b858 100755 --- a/cpp/test/Ice/custom/AllTests.cpp +++ b/cpp/test/Ice/custom/AllTests.cpp @@ -1388,7 +1388,8 @@ public: const ::Test::ClassStructSeq& seq, const InParamPtr& cookie) { - pair< ::Test::ClassStructPtr, ::Test::ClassStructSeq> in = getIn(in, cookie); + pair< ::Test::ClassStructPtr, ::Test::ClassStructSeq> in; + in = getIn(in, cookie); test(ret == in.first); test(cs1 == in.first); test(seq == in.second); @@ -1436,7 +1437,8 @@ public: void throwExcept1(const Ice::AsyncResultPtr& result) { - wstring in = getIn(in, InParamPtr::dynamicCast(result->getCookie())); + wstring in; + in = getIn(in, InParamPtr::dynamicCast(result->getCookie())); try { Test1::WstringClassPrx t = Test1::WstringClassPrx::uncheckedCast(result->getProxy()); @@ -1472,7 +1474,8 @@ public: void throwExcept2(const Ice::AsyncResultPtr& result) { - wstring in = getIn(in, InParamPtr::dynamicCast(result->getCookie())); + wstring in; + in = getIn(in, InParamPtr::dynamicCast(result->getCookie())); try { Test2::WstringClassPrx t = Test2::WstringClassPrx::uncheckedCast(result->getProxy()); |