diff options
Diffstat (limited to 'cpp/src/Ice/Incoming.cpp')
-rw-r--r-- | cpp/src/Ice/Incoming.cpp | 187 |
1 files changed, 185 insertions, 2 deletions
diff --git a/cpp/src/Ice/Incoming.cpp b/cpp/src/Ice/Incoming.cpp index 15e49a61608..4ed98bfc157 100644 --- a/cpp/src/Ice/Incoming.cpp +++ b/cpp/src/Ice/Incoming.cpp @@ -21,6 +21,7 @@ #include <Ice/LoggerUtil.h> #include <Ice/Protocol.h> #include <Ice/ReplyStatus.h> +#include <Ice/Observer.h> #include <IceUtil/StringUtil.h> using namespace std; @@ -49,6 +50,7 @@ IceInternal::IncomingBase::IncomingBase(Instance* instance, ConnectionI* connect IceInternal::IncomingBase::IncomingBase(IncomingBase& in) : _current(in._current), // copy + _observer(in._observer), _os(in._os.instance(), Ice::currentProtocolEncoding), _interceptorAsyncCallbackQueue(in._interceptorAsyncCallbackQueue) // copy { @@ -82,6 +84,15 @@ IceInternal::IncomingBase::__adopt(IncomingBase& other) BasicStream* IncomingBase::__startWriteParams() { + if(_observer) + { + if(_watch.isStarted()) // isStarted == false if dispatched with AMD + { + _observer->userTime(_watch.stop()); + } + _watch.start(); + } + if(_response) { assert(_os.b.size() == headerSize + 4); // Reply status position. @@ -108,11 +119,39 @@ IncomingBase::__endWriteParams(bool ok) *(_os.b.begin() + headerSize + 4) = ok ? replyOK : replyUserException; // Reply status position. _os.endWriteEncaps(); } + + if(_observer) + { + if(_response) + { + if(ok) + { + _observer->responseOK(); + } + else + { + _observer->responseUserException(); + } + } + _observer->marshalTime(_watch.stop()); + } } void IncomingBase::__writeEmptyParams() { + if(_observer) + { + if(_watch.isStarted()) // isStarted == false if dispatched with AMD + { + _observer->userTime(_watch.stop()); + } + if(_response) + { + _observer->responseOK(); + } + } + if(_response) { assert(_os.b.size() == headerSize + 4); // Reply status position. @@ -125,6 +164,25 @@ IncomingBase::__writeEmptyParams() void IncomingBase::__writeParamEncaps(const Byte* v, Ice::Int sz, bool ok) { + if(_observer) + { + if(_watch.isStarted()) // isStarted == false if dispatched with AMD + { + _observer->userTime(_watch.stop()); + } + if(_response) + { + if(ok) + { + _observer->responseOK(); + } + else + { + _observer->responseUserException(); + } + } + } + if(_response) { assert(_os.b.size() == headerSize + 4); // Reply status position. @@ -139,6 +197,11 @@ IncomingBase::__writeParamEncaps(const Byte* v, Ice::Int sz, bool ok) _os.writeEncaps(v, sz); } } + + if(_observer && _locator) + { + _watch.start(); + } } void @@ -157,7 +220,7 @@ IceInternal::IncomingBase::__warning(const Exception& ex) const Ice::IPConnectionInfoPtr ipConnInfo = Ice::IPConnectionInfoPtr::dynamicCast(connInfo); if(ipConnInfo) { - out << "\nremote host: " << ipConnInfo->remoteAddress + " remote port: " << ipConnInfo->remotePort; + out << "\nremote host: " << ipConnInfo->remoteAddress << " remote port: " << ipConnInfo->remotePort; } } } @@ -178,7 +241,7 @@ IceInternal::IncomingBase::__warning(const string& msg) const Ice::IPConnectionInfoPtr ipConnInfo = Ice::IPConnectionInfoPtr::dynamicCast(connInfo); if(ipConnInfo) { - out << "\nremote host: " << ipConnInfo->remoteAddress + " remote port: " << ipConnInfo->remotePort; + out << "\nremote host: " << ipConnInfo->remoteAddress << " remote port: " << ipConnInfo->remotePort; } } } @@ -189,18 +252,35 @@ IceInternal::IncomingBase::__servantLocatorFinished() assert(_locator && _servant); try { + if(_observer) + { + assert(!_watch.isStarted()); + _watch.start(); + } _locator->finished(_current, _servant, _cookie); + if(_observer) + { + _observer->finishedTime(_watch.stop()); + } return true; } catch(const UserException& ex) { assert(_connection); + if(_observer) + { + _observer->finishedTime(_watch.stop()); + } // // The operation may have already marshaled a reply; we must overwrite that reply. // if(_response) { + if(_observer) + { + _observer->responseUserException(); + } _os.b.resize(headerSize + 4); // Reply status position. _os.write(replyUserException); _os.startWriteEncaps(_current.encoding); @@ -213,14 +293,27 @@ IceInternal::IncomingBase::__servantLocatorFinished() _connection->sendNoResponse(); } + if(_observer) + { + _observer->detach(); + _observer = 0; + } _connection = 0; } catch(const std::exception& ex) { + if(_observer) + { + _observer->finishedTime(_watch.stop()); + } __handleException(ex); } catch(...) { + if(_observer) + { + _observer->finishedTime(_watch.stop()); + } __handleException(); } return false; @@ -258,6 +351,11 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) if(_response) { + if(_observer) + { + _observer->responseRequestFailedException(); + } + _os.b.resize(headerSize + 4); // Reply status position. if(dynamic_cast<ObjectNotExistException*>(rfe)) { @@ -309,6 +407,11 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) if(_response) { + if(_observer) + { + _observer->responseUnknownException(); + } + _os.b.resize(headerSize + 4); // Reply status position. if(const UnknownLocalException* ule = dynamic_cast<const UnknownLocalException*>(&exc)) { @@ -358,6 +461,7 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) str << *ex; _os.write(str.str(), false); } + _connection->sendResponse(&_os, _compress); } else @@ -374,6 +478,11 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) if(_response) { + if(_observer) + { + _observer->responseUnknownException(); + } + _os.b.resize(headerSize + 4); // Reply status position. _os.write(replyUnknownException); ostringstream str; @@ -387,6 +496,12 @@ IceInternal::IncomingBase::__handleException(const std::exception& exc) } } + if(_observer) + { + _observer->detach(); + _observer = 0; + } + _connection = 0; } @@ -402,6 +517,10 @@ IceInternal::IncomingBase::__handleException() if(_response) { + if(_observer) + { + _observer->responseUnknownException(); + } _os.b.resize(headerSize + 4); // Reply status position. _os.write(replyUnknownException); string reason = "unknown c++ exception"; @@ -413,6 +532,11 @@ IceInternal::IncomingBase::__handleException() _connection->sendNoResponse(); } + if(_observer) + { + _observer->detach(); + _observer = 0; + } _connection = 0; } @@ -537,6 +661,17 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre _current.ctx.insert(_current.ctx.end(), pr); } + const ObserverResolverPtr& resolver = _is->instance()->initializationData().observerResolver; + if(resolver) + { + _observer = resolver->getDispatchObserver(_current); + if(_observer) + { + _observer->attach(); + _watch.start(); + } + } + // // Don't put the code above into the try block below. Exceptions // in the code above are considered fatal, and must propagate to @@ -556,12 +691,27 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre if(_locator) { + if(_observer) + { + _watch.start(); + } + try { _servant = _locator->locate(_current, _cookie); + + if(_observer) + { + _observer->locateTime(_watch.stop()); + } } catch(const UserException& ex) { + if(_observer) + { + _observer->locateTime(_watch.stop()); + } + Ice::EncodingVersion encoding = _is->skipEncaps(); // Required for batch requests. if(_response) @@ -577,17 +727,35 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre _connection->sendNoResponse(); } + if(_observer) + { + _observer->userException(); + if(_response) + { + _observer->response(); + } + _observer->detach(); + _observer = 0; + } _connection = 0; return; } catch(const std::exception& ex) { + if(_observer) + { + _observer->locateTime(_watch.stop()); + } _is->skipEncaps(); // Required for batch requests. __handleException(ex); return; } catch(...) { + if(_observer) + { + _observer->locateTime(_watch.stop()); + } _is->skipEncaps(); // Required for batch requests. __handleException(); return; @@ -609,6 +777,11 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre // // If this was an asynchronous dispatch, we're done here. // + if(_observer) + { + _observer->userTime(_watch.stop()); + _observer = 0; // Don't detach, it's now attached to the IncomingAsync + } return; } @@ -669,6 +842,16 @@ IceInternal::Incoming::invoke(const ServantManagerPtr& servantManager, BasicStre _connection->sendNoResponse(); } + if(_observer) + { + if(_response) + { + _observer->response(); + } + _observer->detach(); + _observer = 0; + } + _connection = 0; } |