summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-01-07 10:16:41 +0100
committerBenoit Foucher <benoit@zeroc.com>2013-01-07 10:16:41 +0100
commit2c9e1d532eb36bce9167a0ec656808c0a653730a (patch)
tree58781d4f42f58a8bdc03a0c8a1a5fd3160fa44fb /cpp/src
parentFixed (ICE-5058) - WinRT port Platform::AccessDeniedException (diff)
downloadice-2c9e1d532eb36bce9167a0ec656808c0a653730a.tar.bz2
ice-2c9e1d532eb36bce9167a0ec656808c0a653730a.tar.xz
ice-2c9e1d532eb36bce9167a0ec656808c0a653730a.zip
Fixed ICE-4933: Added size and replySize fields to dispatch and remote invocation metrics, support for requestId attribute
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp18
-rw-r--r--cpp/src/Ice/Incoming.cpp19
-rw-r--r--cpp/src/Ice/IncomingAsync.cpp1
-rw-r--r--cpp/src/Ice/InstrumentationI.cpp50
-rw-r--r--cpp/src/Ice/InstrumentationI.h13
-rw-r--r--cpp/src/Ice/Outgoing.cpp5
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp9
7 files changed, 91 insertions, 24 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 8a548df8a1d..d2ef8eadba9 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -559,8 +559,6 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
assert(_state > StateNotValidated);
assert(_state < StateClosing);
- out->attachRemoteObserver(initConnectionInfo(), _endpoint);
-
//
// Ensure the message isn't bigger than what we can send with the
// transport.
@@ -591,6 +589,8 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
#endif
}
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os->b.size() - headerSize - 4);
+
//
// Send the message. If it can't be sent without blocking the message is added
// to _sendStreams and it will be sent by the selector thread.
@@ -638,8 +638,6 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
assert(_state > StateNotValidated);
assert(_state < StateClosing);
- out->__attachRemoteObserver(initConnectionInfo(), _endpoint);
-
//
// Ensure the message isn't bigger than what we can send with the
// transport.
@@ -670,6 +668,8 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
#endif
}
+ out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os->b.size() - headerSize - 4);
+
AsyncStatus status = AsyncStatusQueued;
try
{
@@ -958,8 +958,6 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
_exception->ice_throw();
}
- out->attachRemoteObserver(initConnectionInfo(), _endpoint);
-
if(_batchRequestNum == 0)
{
out->sent(false);
@@ -975,6 +973,9 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out)
#else
copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#endif
+
+ out->attachRemoteObserver(initConnectionInfo(), _endpoint, _batchStream.b.size() - headerSize - 4);
+
_batchStream.swap(*out->os());
//
@@ -1018,8 +1019,6 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
_exception->ice_throw();
}
- outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint);
-
if(_batchRequestNum == 0)
{
AsyncStatus status = AsyncStatusSent;
@@ -1039,6 +1038,9 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync)
#else
copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
#endif
+
+ outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint, 0, _batchStream.b.size() - headerSize - 4);
+
_batchStream.swap(*outAsync->__getOs());
//
diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp
index 04dc96a838b..699ece0d3d5 100644
--- a/cpp/src/Ice/Incoming.cpp
+++ b/cpp/src/Ice/Incoming.cpp
@@ -230,6 +230,7 @@ IceInternal::IncomingBase::__servantLocatorFinished()
_os.startWriteEncaps(_current.encoding, DefaultFormat);
_os.write(ex);
_os.endWriteEncaps();
+ _observer.reply(_os.b.size() - headerSize - 4);
_connection->sendResponse(&_os, _compress);
}
else
@@ -322,6 +323,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
_os.write(rfe->operation, false);
+ _observer.reply(_os.b.size() - headerSize - 4);
_connection->sendResponse(&_os, _compress);
}
else
@@ -389,6 +391,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
_os.write(str.str(), false);
}
+ _observer.reply(_os.b.size() - headerSize - 4);
_connection->sendResponse(&_os, _compress);
}
else
@@ -415,6 +418,8 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
ostringstream str;
str << "std::exception: " << exc.what();
_os.write(str.str(), false);
+
+ _observer.reply(_os.b.size() - headerSize - 4);
_connection->sendResponse(&_os, _compress);
}
else
@@ -448,6 +453,7 @@ IceInternal::IncomingBase::__handleException()
_os.write(replyUnknownException);
string reason = "unknown c++ exception";
_os.write(reason, false);
+ _observer.reply(_os.b.size() - headerSize - 4);
_connection->sendResponse(&_os, _compress);
}
else
@@ -542,6 +548,8 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
{
_is = stream;
+ BasicStream::Container::iterator start = _is->i;
+
//
// Read the current.
//
@@ -582,8 +590,13 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
const CommunicatorObserverPtr& obsv = _is->instance()->initializationData().observer;
if(obsv)
- {
- _observer.attach(obsv->getDispatchObserver(_current));
+ {
+ // Read the parameter encapsulation size.
+ Ice::Int sz;
+ _is->read(sz);
+ _is->i -= 4;
+
+ _observer.attach(obsv->getDispatchObserver(_current, _is->i - start + sz));
}
//
@@ -621,6 +634,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
_os.startWriteEncaps(encoding, DefaultFormat);
_os.write(ex);
_os.endWriteEncaps();
+ _observer.reply(_os.b.size() - headerSize - 4);
_connection->sendResponse(&_os, _compress);
}
else
@@ -711,6 +725,7 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
if(_response)
{
+ _observer.reply(_os.b.size() - headerSize - 4);
_connection->sendResponse(&_os, _compress);
}
else
diff --git a/cpp/src/Ice/IncomingAsync.cpp b/cpp/src/Ice/IncomingAsync.cpp
index e9f8c628e00..30fd7de967d 100644
--- a/cpp/src/Ice/IncomingAsync.cpp
+++ b/cpp/src/Ice/IncomingAsync.cpp
@@ -200,6 +200,7 @@ IceInternal::IncomingAsync::__response()
if(_response)
{
+ _observer.reply(_os.b.size() - headerSize - 4);
_connection->sendResponse(&_os, _compress);
}
else
diff --git a/cpp/src/Ice/InstrumentationI.cpp b/cpp/src/Ice/InstrumentationI.cpp
index 319d0a2842e..437e23ade9c 100644
--- a/cpp/src/Ice/InstrumentationI.cpp
+++ b/cpp/src/Ice/InstrumentationI.cpp
@@ -209,13 +209,14 @@ public:
add("identity", &DispatchHelper::getIdentity);
add("facet", &DispatchHelper::getCurrent, &Current::facet);
add("mode", &DispatchHelper::getMode);
+ add("requestId", &DispatchHelper::getCurrent, &Current::requestId);
setDefault(&DispatchHelper::resolve);
}
};
static Attributes attributes;
- DispatchHelper(const Current& current) : _current(current)
+ DispatchHelper(const Current& current, int size) : _current(current), _size(size)
{
}
@@ -224,6 +225,11 @@ public:
return attributes(this, attribute);
}
+ virtual void initMetrics(const DispatchMetricsPtr& v) const
+ {
+ v->size += _size;
+ }
+
string resolve(const string& attribute) const
{
if(attribute.compare(0, 8, "context.") == 0)
@@ -308,6 +314,7 @@ public:
private:
const Current& _current;
+ const int _size;
mutable string _id;
mutable EndpointInfoPtr _endpointInfo;
};
@@ -466,7 +473,7 @@ private:
InvocationHelper::Attributes InvocationHelper::attributes;
-class RemoteInvocationHelper : public MetricsHelperT<Metrics>
+class RemoteInvocationHelper : public MetricsHelperT<RemoteMetrics>
{
public:
@@ -478,13 +485,14 @@ public:
{
add("parent", &RemoteInvocationHelper::getParent);
add("id", &RemoteInvocationHelper::getId);
+ add("requestId", &RemoteInvocationHelper::_requestId);
addConnectionAttributes<RemoteInvocationHelper>(*this);
}
};
static Attributes attributes;
- RemoteInvocationHelper(const ConnectionInfoPtr& con, const EndpointPtr& endpt) :
- _connectionInfo(con), _endpoint(endpt)
+ RemoteInvocationHelper(const ConnectionInfoPtr& con, const EndpointPtr& endpt, int requestId, int size) :
+ _connectionInfo(con), _endpoint(endpt), _requestId(requestId), _size(size)
{
}
@@ -493,6 +501,11 @@ public:
return attributes(this, attribute);
}
+ virtual void initMetrics(const RemoteMetricsPtr& v) const
+ {
+ v->size += _size;
+ }
+
const string&
getId() const
{
@@ -546,6 +559,8 @@ private:
const ConnectionInfoPtr& _connectionInfo;
const EndpointPtr& _endpoint;
+ const int _requestId;
+ const int _size;
mutable string _id;
mutable EndpointInfoPtr _endpointInfo;
};
@@ -692,6 +707,18 @@ DispatchObserverI::userException()
}
void
+DispatchObserverI::reply(Int size)
+{
+ forEach(add(&DispatchMetrics::replySize, size));
+}
+
+void
+RemoteObserverI::reply(Int size)
+{
+ forEach(add(&RemoteMetrics::replySize, size));
+}
+
+void
InvocationObserverI::retried()
{
forEach(inc(&InvocationMetrics::retry));
@@ -703,12 +730,15 @@ InvocationObserverI::userException()
forEach(inc(&InvocationMetrics::userException));
}
-ObserverPtr
-InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection, const EndpointPtr& endpoint)
+RemoteObserverPtr
+InvocationObserverI::getRemoteObserver(const ConnectionInfoPtr& connection,
+ const EndpointPtr& endpoint,
+ int requestId,
+ int size)
{
try
{
- return getObserver<ObserverI>("Remote", RemoteInvocationHelper(connection, endpoint));
+ return getObserver<RemoteObserverI>("Remote", RemoteInvocationHelper(connection, endpoint, requestId, size));
}
catch(const exception&)
{
@@ -725,7 +755,7 @@ CommunicatorObserverI::CommunicatorObserverI(const IceInternal::MetricsAdminIPtr
_connects(metrics, "ConnectionEstablishment"),
_endpointLookups(metrics, "EndpointLookup")
{
- _invocations.registerSubMap<Metrics>("Remote", &InvocationMetrics::remotes);
+ _invocations.registerSubMap<RemoteMetrics>("Remote", &InvocationMetrics::remotes);
}
void
@@ -832,13 +862,13 @@ CommunicatorObserverI::getInvocationObserver(const ObjectPrx& proxy, const strin
}
DispatchObserverPtr
-CommunicatorObserverI::getDispatchObserver(const Current& current)
+CommunicatorObserverI::getDispatchObserver(const Current& current, int size)
{
if(_dispatch.isEnabled())
{
try
{
- return _dispatch.getObserver(DispatchHelper(current));
+ return _dispatch.getObserver(DispatchHelper(current, size));
}
catch(const exception& ex)
{
diff --git a/cpp/src/Ice/InstrumentationI.h b/cpp/src/Ice/InstrumentationI.h
index 035aed387cc..7ddbbeedb72 100644
--- a/cpp/src/Ice/InstrumentationI.h
+++ b/cpp/src/Ice/InstrumentationI.h
@@ -72,6 +72,14 @@ class DispatchObserverI : public Ice::Instrumentation::DispatchObserver,
public:
virtual void userException();
+
+ virtual void reply(Ice::Int);
+};
+
+class RemoteObserverI : public Ice::Instrumentation::RemoteObserver,
+ public IceMX::ObserverT<IceMX::RemoteMetrics>
+{
+ virtual void reply(Ice::Int);
};
class InvocationObserverI : public Ice::Instrumentation::InvocationObserver,
@@ -83,7 +91,8 @@ public:
virtual void userException();
- virtual Ice::Instrumentation::ObserverPtr getRemoteObserver(const Ice::ConnectionInfoPtr&, const Ice::EndpointPtr&);
+ virtual Ice::Instrumentation::RemoteObserverPtr getRemoteObserver(const Ice::ConnectionInfoPtr&,
+ const Ice::EndpointPtr&, Ice::Int, Ice::Int);
};
class ICE_API CommunicatorObserverI : public Ice::Instrumentation::CommunicatorObserver
@@ -113,7 +122,7 @@ public:
const std::string&,
const Ice::Context&);
- virtual Ice::Instrumentation::DispatchObserverPtr getDispatchObserver(const Ice::Current&);
+ virtual Ice::Instrumentation::DispatchObserverPtr getDispatchObserver(const Ice::Current&, Ice::Int);
const IceInternal::MetricsAdminIPtr& getMetricsAdmin() const;
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
index 85e15a53509..118dc25612c 100644
--- a/cpp/src/Ice/Outgoing.cpp
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -379,9 +379,14 @@ IceInternal::Outgoing::finished(BasicStream& is)
assert(_handler->getReference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways.
assert(_state <= StateInProgress);
+ if(_remoteObserver)
+ {
+ _remoteObserver->reply(is.b.size() - headerSize - 4);
+ }
_remoteObserver.detach();
_is.swap(is);
+
Byte replyStatus;
_is.read(replyStatus);
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index eee5b3215c2..890e33a4cb9 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -617,6 +617,10 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!_exception.get() && !(_state & Done));
+ if(_remoteObserver)
+ {
+ _remoteObserver->reply(is.b.size() - headerSize - 4);
+ }
_remoteObserver.detach();
if(_timerTaskConnection)
@@ -1054,9 +1058,10 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
_outAsync->check(false);
}
- virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt)
+ virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt,
+ Ice::Int requestId, Ice::Int sz)
{
- _remoteObserver.attach(_observer.getRemoteObserver(connection, endpt));
+ _remoteObserver.attach(_observer.getRemoteObserver(connection, endpt, requestId, sz));
}
private: