diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 8 | ||||
-rw-r--r-- | cpp/src/Ice/InstrumentationI.cpp | 38 | ||||
-rw-r--r-- | cpp/src/Ice/InstrumentationI.h | 17 | ||||
-rw-r--r-- | cpp/src/Ice/Outgoing.cpp | 18 | ||||
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 26 |
5 files changed, 60 insertions, 47 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 6db9bae4daf..896062a4d42 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -358,7 +358,7 @@ CollocatedRequestHandler::invokeRequest(Outgoing* out) } } - out->attachCollocatedObserver(requestId); + out->attachCollocatedObserver(_adapter, requestId); if(_reference->getInvocationTimeout() > 0) { @@ -389,7 +389,7 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync) } } - outAsync->__attachCollocatedObserver(requestId); + outAsync->__attachCollocatedObserver(_adapter, requestId); _adapter->getThreadPool()->execute(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false)); return AsyncStatusQueued; @@ -428,7 +428,7 @@ CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out) } } - out->attachCollocatedObserver(0); + out->attachCollocatedObserver(_adapter, 0); if(invokeNum > 0) { @@ -480,7 +480,7 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync) } } - outAsync->__attachCollocatedObserver(0); + outAsync->__attachCollocatedObserver(_adapter, 0); if(invokeNum > 0) { diff --git a/cpp/src/Ice/InstrumentationI.cpp b/cpp/src/Ice/InstrumentationI.cpp index 6ff70f8aafb..b0a06b1cf08 100644 --- a/cpp/src/Ice/InstrumentationI.cpp +++ b/cpp/src/Ice/InstrumentationI.cpp @@ -583,7 +583,7 @@ private: RemoteInvocationHelper::Attributes RemoteInvocationHelper::attributes; -class CollocatedInvocationHelper : public MetricsHelperT<RemoteMetrics> +class CollocatedInvocationHelper : public MetricsHelperT<CollocatedMetrics> { public: @@ -600,7 +600,8 @@ public: }; static Attributes attributes; - CollocatedInvocationHelper(int requestId, int size) : _requestId(requestId), _size(size) + CollocatedInvocationHelper(const Ice::ObjectAdapterPtr& adapter, int requestId, int size) : + _requestId(requestId), _size(size), _id(adapter->getName()) { } @@ -609,7 +610,7 @@ public: return attributes(this, attribute); } - virtual void initMetrics(const RemoteMetricsPtr& v) const + virtual void initMetrics(const CollocatedMetricsPtr& v) const { v->size += _size; } @@ -617,12 +618,6 @@ public: const string& getId() const { - if(_id.empty()) - { - ostringstream os; - os << _requestId; - _id = os.str(); - } return _id; } @@ -824,6 +819,16 @@ RemoteObserverI::reply(Int size) } void +CollocatedObserverI::reply(Int size) +{ + forEach(add(&CollocatedMetrics::replySize, size)); + if(_delegate) + { + _delegate->reply(size); + } +} + +void InvocationObserverI::retried() { forEach(inc(&InvocationMetrics::retry)); @@ -866,19 +871,19 @@ InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection, return 0; } -RemoteObserverPtr -InvocationObserverI::getCollocatedObserver(int requestId, int size) +CollocatedObserverPtr +InvocationObserverI::getCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, int requestId, int size) { try { - RemoteObserverPtr delegate; + CollocatedObserverPtr delegate; if(_delegate) { - delegate = _delegate->getCollocatedObserver(requestId, size); + delegate = _delegate->getCollocatedObserver(adapter, requestId, size); } - return getObserverWithDelegate<RemoteObserverI>("Collocated", - CollocatedInvocationHelper(requestId, size), - delegate); + return getObserverWithDelegate<CollocatedObserverI>("Collocated", + CollocatedInvocationHelper(adapter, requestId, size), + delegate); } catch(const exception&) { @@ -899,6 +904,7 @@ CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr _endpointLookups(metrics, "EndpointLookup") { _invocations.registerSubMap<RemoteMetrics>("Remote", &InvocationMetrics::remotes); + _invocations.registerSubMap<CollocatedMetrics>("Collocated", &InvocationMetrics::collocated); } void diff --git a/cpp/src/Ice/InstrumentationI.h b/cpp/src/Ice/InstrumentationI.h index cdc31e8106c..db515999f23 100644 --- a/cpp/src/Ice/InstrumentationI.h +++ b/cpp/src/Ice/InstrumentationI.h @@ -184,6 +184,14 @@ public: virtual void reply(Ice::Int); }; +class CollocatedObserverI : public ObserverWithDelegateT<IceMX::CollocatedMetrics, + Ice::Instrumentation::CollocatedObserver> +{ +public: + + virtual void reply(Ice::Int); +}; + class InvocationObserverI : public ObserverWithDelegateT<IceMX::InvocationMetrics, Ice::Instrumentation::InvocationObserver> { @@ -193,12 +201,11 @@ public: virtual void userException(); - virtual Ice::Instrumentation::RemoteObserverPtr getRemoteObserver(const Ice::ConnectionInfoPtr&, - const Ice::EndpointPtr&, - Ice::Int, - Ice::Int); + virtual Ice::Instrumentation::RemoteObserverPtr + getRemoteObserver(const Ice::ConnectionInfoPtr&, const Ice::EndpointPtr&, Ice::Int, Ice::Int); - virtual Ice::Instrumentation::RemoteObserverPtr getCollocatedObserver(Ice::Int, Ice::Int); + virtual Ice::Instrumentation::CollocatedObserverPtr + getCollocatedObserver(const Ice::ObjectAdapterPtr&, Ice::Int, Ice::Int); }; typedef ObserverWithDelegateT<IceMX::Metrics, Ice::Instrumentation::Observer> ObserverI; diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 73778c9dafd..0e89e524a0a 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -279,7 +279,7 @@ IceInternal::Outgoing::sent() IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); if(_proxy->__reference()->getMode() != Reference::ModeTwoway) { - _remoteObserver.detach(); + _childObserver.detach(); _state = StateOK; } _sent = true; @@ -297,8 +297,8 @@ IceInternal::Outgoing::finished(const Exception& ex, bool sent) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); assert(_state <= StateInProgress); - _remoteObserver.failed(ex.ice_name()); - _remoteObserver.detach(); + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); _state = StateFailed; _exception.reset(ex.ice_clone()); @@ -314,11 +314,11 @@ IceInternal::Outgoing::finished(BasicStream& is) assert(_proxy->__reference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways. assert(_state <= StateInProgress); - if(_remoteObserver) + if(_childObserver) { - _remoteObserver->reply(static_cast<Int>(is.b.size() - headerSize - 4)); + _childObserver->reply(static_cast<Int>(is.b.size() - headerSize - 4)); } - _remoteObserver.detach(); + _childObserver.detach(); _is.swap(is); @@ -615,7 +615,7 @@ void IceInternal::BatchOutgoing::sent() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _remoteObserver.detach(); + _childObserver.detach(); _sent = true; _monitor.notify(); @@ -631,8 +631,8 @@ void IceInternal::BatchOutgoing::finished(const Ice::Exception& ex, bool) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _remoteObserver.failed(ex.ice_name()); - _remoteObserver.detach(); + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); _exception.reset(ex.ice_clone()); _monitor.notify(); } diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 5a600e8fce2..3fb2c8ed08b 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -527,7 +527,7 @@ IceInternal::OutgoingAsync::__sent() assert(!(_state & Done)); if(_proxy->__reference()->getMode() != Reference::ModeTwoway) { - _remoteObserver.detach(); + _childObserver.detach(); if(!_callback || !_callback->__hasSentCallback()) { _observer.detach(); @@ -556,8 +556,8 @@ IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc, bool sent) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); assert(!(_state & Done)); - _remoteObserver.failed(exc.ice_name()); - _remoteObserver.detach(); + _childObserver.failed(exc.ice_name()); + _childObserver.detach(); if(_timeoutRequestHandler) { _instance->timer()->cancel(this); @@ -596,11 +596,11 @@ IceInternal::OutgoingAsync::__finished() assert(!_exception.get() && !(_state & Done)); assert(!_is.b.empty()); - if(_remoteObserver) + if(_childObserver) { - _remoteObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4)); + _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4)); } - _remoteObserver.detach(); + _childObserver.detach(); if(_timeoutRequestHandler) { @@ -854,7 +854,7 @@ IceInternal::BatchOutgoingAsync::__sent() assert(!_exception.get()); _state |= Done | OK | Sent; //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization. - _remoteObserver.detach(); + _childObserver.detach(); if(_timeoutRequestHandler) { _instance->timer()->cancel(this); @@ -878,8 +878,8 @@ IceInternal::BatchOutgoingAsync::__invokeSent() void IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc, bool) { - _remoteObserver.failed(exc.ice_name()); - _remoteObserver.detach(); + _childObserver.failed(exc.ice_name()); + _childObserver.detach(); if(_timeoutRequestHandler) { _instance->timer()->cancel(this); @@ -1021,7 +1021,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt virtual bool __sent() { - _remoteObserver.detach(); + _childObserver.detach(); _outAsync->check(false); return false; } @@ -1032,15 +1032,15 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt virtual void __finished(const Ice::Exception& ex, bool) { - _remoteObserver.failed(ex.ice_name()); - _remoteObserver.detach(); + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); _outAsync->check(false); } virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt, Ice::Int requestId, Ice::Int sz) { - _remoteObserver.attach(_observer.getRemoteObserver(connection, endpt, requestId, sz)); + _childObserver.attach(_observer.getRemoteObserver(connection, endpt, requestId, sz)); } private: |