diff options
author | Benoit Foucher <benoit@zeroc.com> | 2013-01-07 10:16:41 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2013-01-07 10:16:41 +0100 |
commit | 2c9e1d532eb36bce9167a0ec656808c0a653730a (patch) | |
tree | 58781d4f42f58a8bdc03a0c8a1a5fd3160fa44fb /cpp/src | |
parent | Fixed (ICE-5058) - WinRT port Platform::AccessDeniedException (diff) | |
download | ice-2c9e1d532eb36bce9167a0ec656808c0a653730a.tar.bz2 ice-2c9e1d532eb36bce9167a0ec656808c0a653730a.tar.xz ice-2c9e1d532eb36bce9167a0ec656808c0a653730a.zip |
Fixed ICE-4933: Added size and replySize fields to dispatch and remote invocation metrics, support for requestId attribute
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 18 | ||||
-rw-r--r-- | cpp/src/Ice/Incoming.cpp | 19 | ||||
-rw-r--r-- | cpp/src/Ice/IncomingAsync.cpp | 1 | ||||
-rw-r--r-- | cpp/src/Ice/InstrumentationI.cpp | 50 | ||||
-rw-r--r-- | cpp/src/Ice/InstrumentationI.h | 13 | ||||
-rw-r--r-- | cpp/src/Ice/Outgoing.cpp | 5 | ||||
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 9 |
7 files changed, 91 insertions, 24 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 8a548df8a1d..d2ef8eadba9 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -559,8 +559,6 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) assert(_state > StateNotValidated); assert(_state < StateClosing); - out->attachRemoteObserver(initConnectionInfo(), _endpoint); - // // Ensure the message isn't bigger than what we can send with the // transport. @@ -591,6 +589,8 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) #endif } + out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os->b.size() - headerSize - 4); + // // Send the message. If it can't be sent without blocking the message is added // to _sendStreams and it will be sent by the selector thread. @@ -638,8 +638,6 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b assert(_state > StateNotValidated); assert(_state < StateClosing); - out->__attachRemoteObserver(initConnectionInfo(), _endpoint); - // // Ensure the message isn't bigger than what we can send with the // transport. @@ -670,6 +668,8 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b #endif } + out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os->b.size() - headerSize - 4); + AsyncStatus status = AsyncStatusQueued; try { @@ -958,8 +958,6 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) _exception->ice_throw(); } - out->attachRemoteObserver(initConnectionInfo(), _endpoint); - if(_batchRequestNum == 0) { out->sent(false); @@ -975,6 +973,9 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) #else copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif + + out->attachRemoteObserver(initConnectionInfo(), _endpoint, _batchStream.b.size() - headerSize - 4); + _batchStream.swap(*out->os()); // @@ -1018,8 +1019,6 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) _exception->ice_throw(); } - outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint); - if(_batchRequestNum == 0) { AsyncStatus status = AsyncStatusSent; @@ -1039,6 +1038,9 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) #else copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif + + outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint, 0, _batchStream.b.size() - headerSize - 4); + _batchStream.swap(*outAsync->__getOs()); // diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp index 04dc96a838b..699ece0d3d5 100644 --- a/cpp/src/Ice/Incoming.cpp +++ b/cpp/src/Ice/Incoming.cpp @@ -230,6 +230,7 @@ IceInternal::IncomingBase::__servantLocatorFinished() _os.startWriteEncaps(_current.encoding, DefaultFormat); _os.write(ex); _os.endWriteEncaps(); + _observer.reply(_os.b.size() - headerSize - 4); _connection->sendResponse(&_os, _compress); } else @@ -322,6 +323,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) _os.write(rfe->operation, false); + _observer.reply(_os.b.size() - headerSize - 4); _connection->sendResponse(&_os, _compress); } else @@ -389,6 +391,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) _os.write(str.str(), false); } + _observer.reply(_os.b.size() - headerSize - 4); _connection->sendResponse(&_os, _compress); } else @@ -415,6 +418,8 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) ostringstream str; str << "std::exception: " << exc.what(); _os.write(str.str(), false); + + _observer.reply(_os.b.size() - headerSize - 4); _connection->sendResponse(&_os, _compress); } else @@ -448,6 +453,7 @@ IceInternal::IncomingBase::__handleException() _os.write(replyUnknownException); string reason = "unknown c++ exception"; _os.write(reason, false); + _observer.reply(_os.b.size() - headerSize - 4); _connection->sendResponse(&_os, _compress); } else @@ -542,6 +548,8 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre { _is = stream; + BasicStream::Container::iterator start = _is->i; + // // Read the current. // @@ -582,8 +590,13 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre const CommunicatorObserverPtr& obsv = _is->instance()->initializationData().observer; if(obsv) - { - _observer.attach(obsv->getDispatchObserver(_current)); + { + // Read the parameter encapsulation size. + Ice::Int sz; + _is->read(sz); + _is->i -= 4; + + _observer.attach(obsv->getDispatchObserver(_current, _is->i - start + sz)); } // @@ -621,6 +634,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre _os.startWriteEncaps(encoding, DefaultFormat); _os.write(ex); _os.endWriteEncaps(); + _observer.reply(_os.b.size() - headerSize - 4); _connection->sendResponse(&_os, _compress); } else @@ -711,6 +725,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre if(_response) { + _observer.reply(_os.b.size() - headerSize - 4); _connection->sendResponse(&_os, _compress); } else diff --git a/cpp/src/Ice/IncomingAsync.cpp b/cpp/src/Ice/IncomingAsync.cpp index e9f8c628e00..30fd7de967d 100644 --- a/cpp/src/Ice/IncomingAsync.cpp +++ b/cpp/src/Ice/IncomingAsync.cpp @@ -200,6 +200,7 @@ IceInternal::IncomingAsync::__response() if(_response) { + _observer.reply(_os.b.size() - headerSize - 4); _connection->sendResponse(&_os, _compress); } else diff --git a/cpp/src/Ice/InstrumentationI.cpp b/cpp/src/Ice/InstrumentationI.cpp index 319d0a2842e..437e23ade9c 100644 --- a/cpp/src/Ice/InstrumentationI.cpp +++ b/cpp/src/Ice/InstrumentationI.cpp @@ -209,13 +209,14 @@ public: add("identity", &DispatchHelper::getIdentity); add("facet", &DispatchHelper::getCurrent, &Current::facet); add("mode", &DispatchHelper::getMode); + add("requestId", &DispatchHelper::getCurrent, &Current::requestId); setDefault(&DispatchHelper::resolve); } }; static Attributes attributes; - DispatchHelper(const Current& current) : _current(current) + DispatchHelper(const Current& current, int size) : _current(current), _size(size) { } @@ -224,6 +225,11 @@ public: return attributes(this, attribute); } + virtual void initMetrics(const DispatchMetricsPtr& v) const + { + v->size += _size; + } + string resolve(const string& attribute) const { if(attribute.compare(0, 8, "context.") == 0) @@ -308,6 +314,7 @@ public: private: const Current& _current; + const int _size; mutable string _id; mutable EndpointInfoPtr _endpointInfo; }; @@ -466,7 +473,7 @@ private: InvocationHelper::Attributes InvocationHelper::attributes; -class RemoteInvocationHelper : public MetricsHelperT<Metrics> +class RemoteInvocationHelper : public MetricsHelperT<RemoteMetrics> { public: @@ -478,13 +485,14 @@ public: { add("parent", &RemoteInvocationHelper::getParent); add("id", &RemoteInvocationHelper::getId); + add("requestId", &RemoteInvocationHelper::_requestId); addConnectionAttributes<RemoteInvocationHelper>(*this); } }; static Attributes attributes; - RemoteInvocationHelper(const ConnectionInfoPtr& con, const EndpointPtr& endpt) : - _connectionInfo(con), _endpoint(endpt) + RemoteInvocationHelper(const ConnectionInfoPtr& con, const EndpointPtr& endpt, int requestId, int size) : + _connectionInfo(con), _endpoint(endpt), _requestId(requestId), _size(size) { } @@ -493,6 +501,11 @@ public: return attributes(this, attribute); } + virtual void initMetrics(const RemoteMetricsPtr& v) const + { + v->size += _size; + } + const string& getId() const { @@ -546,6 +559,8 @@ private: const ConnectionInfoPtr& _connectionInfo; const EndpointPtr& _endpoint; + const int _requestId; + const int _size; mutable string _id; mutable EndpointInfoPtr _endpointInfo; }; @@ -692,6 +707,18 @@ DispatchObserverI::userException() } void +DispatchObserverI::reply(Int size) +{ + forEach(add(&DispatchMetrics::replySize, size)); +} + +void +RemoteObserverI::reply(Int size) +{ + forEach(add(&RemoteMetrics::replySize, size)); +} + +void InvocationObserverI::retried() { forEach(inc(&InvocationMetrics::retry)); @@ -703,12 +730,15 @@ InvocationObserverI::userException() forEach(inc(&InvocationMetrics::userException)); } -ObserverPtr -InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection, const EndpointPtr& endpoint) +RemoteObserverPtr +InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection, + const EndpointPtr& endpoint, + int requestId, + int size) { try { - return getObserver<ObserverI>("Remote", RemoteInvocationHelper(connection, endpoint)); + return getObserver<RemoteObserverI>("Remote", RemoteInvocationHelper(connection, endpoint, requestId, size)); } catch(const exception&) { @@ -725,7 +755,7 @@ CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr _connects(metrics, "ConnectionEstablishment"), _endpointLookups(metrics, "EndpointLookup") { - _invocations.registerSubMap<Metrics>("Remote", &InvocationMetrics::remotes); + _invocations.registerSubMap<RemoteMetrics>("Remote", &InvocationMetrics::remotes); } void @@ -832,13 +862,13 @@ CommunicatorObserverI::getInvocationObserver(const ObjectPrx& proxy, const strin } DispatchObserverPtr -CommunicatorObserverI::getDispatchObserver(const Current& current) +CommunicatorObserverI::getDispatchObserver(const Current& current, int size) { if(_dispatch.isEnabled()) { try { - return _dispatch.getObserver(DispatchHelper(current)); + return _dispatch.getObserver(DispatchHelper(current, size)); } catch(const exception& ex) { diff --git a/cpp/src/Ice/InstrumentationI.h b/cpp/src/Ice/InstrumentationI.h index 035aed387cc..7ddbbeedb72 100644 --- a/cpp/src/Ice/InstrumentationI.h +++ b/cpp/src/Ice/InstrumentationI.h @@ -72,6 +72,14 @@ class DispatchObserverI : public Ice::Instrumentation::DispatchObserver, public: virtual void userException(); + + virtual void reply(Ice::Int); +}; + +class RemoteObserverI : public Ice::Instrumentation::RemoteObserver, + public IceMX::ObserverT<IceMX::RemoteMetrics> +{ + virtual void reply(Ice::Int); }; class InvocationObserverI : public Ice::Instrumentation::InvocationObserver, @@ -83,7 +91,8 @@ public: virtual void userException(); - virtual Ice::Instrumentation::ObserverPtr getRemoteObserver(const Ice::ConnectionInfoPtr&, const Ice::EndpointPtr&); + virtual Ice::Instrumentation::RemoteObserverPtr getRemoteObserver(const Ice::ConnectionInfoPtr&, + const Ice::EndpointPtr&, Ice::Int, Ice::Int); }; class ICE_API CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver @@ -113,7 +122,7 @@ public: const std::string&, const Ice::Context&); - virtual Ice::Instrumentation::DispatchObserverPtr getDispatchObserver(const Ice::Current&); + virtual Ice::Instrumentation::DispatchObserverPtr getDispatchObserver(const Ice::Current&, Ice::Int); const IceInternal::MetricsAdminIPtr& getMetricsAdmin() const; diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 85e15a53509..118dc25612c 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -379,9 +379,14 @@ IceInternal::Outgoing::finished(BasicStream& is) assert(_handler->getReference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways. assert(_state <= StateInProgress); + if(_remoteObserver) + { + _remoteObserver->reply(is.b.size() - headerSize - 4); + } _remoteObserver.detach(); _is.swap(is); + Byte replyStatus; _is.read(replyStatus); diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index eee5b3215c2..890e33a4cb9 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -617,6 +617,10 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); assert(!_exception.get() && !(_state & Done)); + if(_remoteObserver) + { + _remoteObserver->reply(is.b.size() - headerSize - 4); + } _remoteObserver.detach(); if(_timerTaskConnection) @@ -1054,9 +1058,10 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt _outAsync->check(false); } - virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt) + virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt, + Ice::Int requestId, Ice::Int sz) { - _remoteObserver.attach(_observer.getRemoteObserver(connection, endpt)); + _remoteObserver.attach(_observer.getRemoteObserver(connection, endpt, requestId, sz)); } private: |