summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Incoming.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Incoming.cpp')
-rw-r--r--cpp/src/Ice/Incoming.cpp187
1 files changed, 185 insertions, 2 deletions
diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp
index 15e49a61608..4ed98bfc157 100644
--- a/cpp/src/Ice/Incoming.cpp
+++ b/cpp/src/Ice/Incoming.cpp
@@ -21,6 +21,7 @@
#include <Ice/LoggerUtil.h>
#include <Ice/Protocol.h>
#include <Ice/ReplyStatus.h>
+#include <Ice/Observer.h>
#include <IceUtil/StringUtil.h>
using namespace std;
@@ -49,6 +50,7 @@ IceInternal::IncomingBase::IncomingBase(Instance* instance, ConnectionI* connect
IceInternal::IncomingBase::IncomingBase(IncomingBase& in) :
_current(in._current), // copy
+ _observer(in._observer),
_os(in._os.instance(), Ice::currentProtocolEncoding),
_interceptorAsyncCallbackQueue(in._interceptorAsyncCallbackQueue) // copy
{
@@ -82,6 +84,15 @@ IceInternal::IncomingBase::__adopt(IncomingBase& other)
BasicStream*
IncomingBase::__startWriteParams()
{
+ if(_observer)
+ {
+ if(_watch.isStarted()) // isStarted == false if dispatched with AMD
+ {
+ _observer->userTime(_watch.stop());
+ }
+ _watch.start();
+ }
+
if(_response)
{
assert(_os.b.size() == headerSize + 4); // Reply status position.
@@ -108,11 +119,39 @@ IncomingBase::__endWriteParams(bool ok)
*(_os.b.begin() + headerSize + 4) = ok ? replyOK : replyUserException; // Reply status position.
_os.endWriteEncaps();
}
+
+ if(_observer)
+ {
+ if(_response)
+ {
+ if(ok)
+ {
+ _observer->responseOK();
+ }
+ else
+ {
+ _observer->responseUserException();
+ }
+ }
+ _observer->marshalTime(_watch.stop());
+ }
}
void
IncomingBase::__writeEmptyParams()
{
+ if(_observer)
+ {
+ if(_watch.isStarted()) // isStarted == false if dispatched with AMD
+ {
+ _observer->userTime(_watch.stop());
+ }
+ if(_response)
+ {
+ _observer->responseOK();
+ }
+ }
+
if(_response)
{
assert(_os.b.size() == headerSize + 4); // Reply status position.
@@ -125,6 +164,25 @@ IncomingBase::__writeEmptyParams()
void
IncomingBase::__writeParamEncaps(const Byte* v, Ice::Int sz, bool ok)
{
+ if(_observer)
+ {
+ if(_watch.isStarted()) // isStarted == false if dispatched with AMD
+ {
+ _observer->userTime(_watch.stop());
+ }
+ if(_response)
+ {
+ if(ok)
+ {
+ _observer->responseOK();
+ }
+ else
+ {
+ _observer->responseUserException();
+ }
+ }
+ }
+
if(_response)
{
assert(_os.b.size() == headerSize + 4); // Reply status position.
@@ -139,6 +197,11 @@ IncomingBase::__writeParamEncaps(const Byte* v, Ice::Int sz, bool ok)
_os.writeEncaps(v, sz);
}
}
+
+ if(_observer && _locator)
+ {
+ _watch.start();
+ }
}
void
@@ -157,7 +220,7 @@ IceInternal::IncomingBase::__warning(const Exception& ex) const
Ice::IPConnectionInfoPtr ipConnInfo = Ice::IPConnectionInfoPtr::dynamicCast(connInfo);
if(ipConnInfo)
{
- out << "\nremote host: " << ipConnInfo->remoteAddress + " remote port: " << ipConnInfo->remotePort;
+ out << "\nremote host: " << ipConnInfo->remoteAddress << " remote port: " << ipConnInfo->remotePort;
}
}
}
@@ -178,7 +241,7 @@ IceInternal::IncomingBase::__warning(const string& msg) const
Ice::IPConnectionInfoPtr ipConnInfo = Ice::IPConnectionInfoPtr::dynamicCast(connInfo);
if(ipConnInfo)
{
- out << "\nremote host: " << ipConnInfo->remoteAddress + " remote port: " << ipConnInfo->remotePort;
+ out << "\nremote host: " << ipConnInfo->remoteAddress << " remote port: " << ipConnInfo->remotePort;
}
}
}
@@ -189,18 +252,35 @@ IceInternal::IncomingBase::__servantLocatorFinished()
assert(_locator && _servant);
try
{
+ if(_observer)
+ {
+ assert(!_watch.isStarted());
+ _watch.start();
+ }
_locator->finished(_current, _servant, _cookie);
+ if(_observer)
+ {
+ _observer->finishedTime(_watch.stop());
+ }
return true;
}
catch(const UserException& ex)
{
assert(_connection);
+ if(_observer)
+ {
+ _observer->finishedTime(_watch.stop());
+ }
//
// The operation may have already marshaled a reply; we must overwrite that reply.
//
if(_response)
{
+ if(_observer)
+ {
+ _observer->responseUserException();
+ }
_os.b.resize(headerSize + 4); // Reply status position.
_os.write(replyUserException);
_os.startWriteEncaps(_current.encoding);
@@ -213,14 +293,27 @@ IceInternal::IncomingBase::__servantLocatorFinished()
_connection->sendNoResponse();
}
+ if(_observer)
+ {
+ _observer->detach();
+ _observer = 0;
+ }
_connection = 0;
}
catch(const std::exception& ex)
{
+ if(_observer)
+ {
+ _observer->finishedTime(_watch.stop());
+ }
__handleException(ex);
}
catch(...)
{
+ if(_observer)
+ {
+ _observer->finishedTime(_watch.stop());
+ }
__handleException();
}
return false;
@@ -258,6 +351,11 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
if(_response)
{
+ if(_observer)
+ {
+ _observer->responseRequestFailedException();
+ }
+
_os.b.resize(headerSize + 4); // Reply status position.
if(dynamic_cast<ObjectNotExistException*>(rfe))
{
@@ -309,6 +407,11 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
if(_response)
{
+ if(_observer)
+ {
+ _observer->responseUnknownException();
+ }
+
_os.b.resize(headerSize + 4); // Reply status position.
if(const UnknownLocalException* ule = dynamic_cast<const UnknownLocalException*>(&exc))
{
@@ -358,6 +461,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
str << *ex;
_os.write(str.str(), false);
}
+
_connection->sendResponse(&_os, _compress);
}
else
@@ -374,6 +478,11 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
if(_response)
{
+ if(_observer)
+ {
+ _observer->responseUnknownException();
+ }
+
_os.b.resize(headerSize + 4); // Reply status position.
_os.write(replyUnknownException);
ostringstream str;
@@ -387,6 +496,12 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc)
}
}
+ if(_observer)
+ {
+ _observer->detach();
+ _observer = 0;
+ }
+
_connection = 0;
}
@@ -402,6 +517,10 @@ IceInternal::IncomingBase::__handleException()
if(_response)
{
+ if(_observer)
+ {
+ _observer->responseUnknownException();
+ }
_os.b.resize(headerSize + 4); // Reply status position.
_os.write(replyUnknownException);
string reason = "unknown c++ exception";
@@ -413,6 +532,11 @@ IceInternal::IncomingBase::__handleException()
_connection->sendNoResponse();
}
+ if(_observer)
+ {
+ _observer->detach();
+ _observer = 0;
+ }
_connection = 0;
}
@@ -537,6 +661,17 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
_current.ctx.insert(_current.ctx.end(), pr);
}
+ const ObserverResolverPtr& resolver = _is->instance()->initializationData().observerResolver;
+ if(resolver)
+ {
+ _observer = resolver->getDispatchObserver(_current);
+ if(_observer)
+ {
+ _observer->attach();
+ _watch.start();
+ }
+ }
+
//
// Don't put the code above into the try block below. Exceptions
// in the code above are considered fatal, and must propagate to
@@ -556,12 +691,27 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
if(_locator)
{
+ if(_observer)
+ {
+ _watch.start();
+ }
+
try
{
_servant = _locator->locate(_current, _cookie);
+
+ if(_observer)
+ {
+ _observer->locateTime(_watch.stop());
+ }
}
catch(const UserException& ex)
{
+ if(_observer)
+ {
+ _observer->locateTime(_watch.stop());
+ }
+
Ice::EncodingVersion encoding = _is->skipEncaps(); // Required for batch requests.
if(_response)
@@ -577,17 +727,35 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
_connection->sendNoResponse();
}
+ if(_observer)
+ {
+ _observer->userException();
+ if(_response)
+ {
+ _observer->response();
+ }
+ _observer->detach();
+ _observer = 0;
+ }
_connection = 0;
return;
}
catch(const std::exception& ex)
{
+ if(_observer)
+ {
+ _observer->locateTime(_watch.stop());
+ }
_is->skipEncaps(); // Required for batch requests.
__handleException(ex);
return;
}
catch(...)
{
+ if(_observer)
+ {
+ _observer->locateTime(_watch.stop());
+ }
_is->skipEncaps(); // Required for batch requests.
__handleException();
return;
@@ -609,6 +777,11 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
//
// If this was an asynchronous dispatch, we're done here.
//
+ if(_observer)
+ {
+ _observer->userTime(_watch.stop());
+ _observer = 0; // Don't detach, it's now attached to the IncomingAsync
+ }
return;
}
@@ -669,6 +842,16 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre
_connection->sendNoResponse();
}
+ if(_observer)
+ {
+ if(_response)
+ {
+ _observer->response();
+ }
+ _observer->detach();
+ _observer = 0;
+ }
+
_connection = 0;
}