summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/EndpointI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/EndpointI.cpp')
-rw-r--r--cpp/src/Ice/EndpointI.cpp112
1 files changed, 102 insertions, 10 deletions
diff --git a/cpp/src/Ice/EndpointI.cpp b/cpp/src/Ice/EndpointI.cpp
index 949baa82cdc..ab74afb660a 100644
--- a/cpp/src/Ice/EndpointI.cpp
+++ b/cpp/src/Ice/EndpointI.cpp
@@ -15,6 +15,7 @@
#include <IceUtil/MutexPtrLock.h>
using namespace std;
+using namespace Ice::Instrumentation;
using namespace IceInternal;
namespace
@@ -156,13 +157,14 @@ IceInternal::EndpointI::parseOption(const string& option, const string& arg, con
#ifndef ICE_OS_WINRT
IceInternal::EndpointHostResolver::EndpointHostResolver(const InstancePtr& instance) :
- IceUtil::Thread("Ice endpoint host resolver thread"),
+ IceUtil::Thread("Ice.HostResolver"),
_instance(instance),
_destroyed(false)
{
__setNoDelete(true);
try
{
+ updateObserver();
bool hasPriority = _instance->initializationData().properties->getProperty("Ice.ThreadPriority") != "";
int priority = _instance->initializationData().properties->getPropertyAsInt("Ice.ThreadPriority");
if(hasPriority)
@@ -185,6 +187,40 @@ IceInternal::EndpointHostResolver::EndpointHostResolver(const InstancePtr& insta
__setNoDelete(false);
}
+vector<ConnectorPtr>
+IceInternal::EndpointHostResolver::resolve(const string& host, int port, const EndpointIPtr& endpoint)
+{
+ vector<ConnectorPtr> connectors;
+ ObserverPtr observer;
+ const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver;
+ if(resolver)
+ {
+ observer = resolver->getEndpointResolveObserver(endpoint->getInfo(), endpoint->toString());
+ if(observer)
+ {
+ observer->attach();
+ }
+ }
+
+ try
+ {
+ connectors = endpoint->connectors(getAddresses(host, port, _instance->protocolSupport(), true));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(observer)
+ {
+ observer->failed(ex.ice_name());
+ }
+ }
+
+ if(observer)
+ {
+ observer->detach();
+ }
+ return connectors;
+}
+
void
IceInternal::EndpointHostResolver::resolve(const string& host, int port, const EndpointIPtr& endpoint,
const EndpointI_connectorsPtr& callback)
@@ -216,6 +252,17 @@ IceInternal::EndpointHostResolver::resolve(const string& host, int port, const E
entry.port = port;
entry.endpoint = endpoint;
entry.callback = callback;
+
+ const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver;
+ if(resolver)
+ {
+ entry.observer = resolver->getEndpointResolveObserver(endpoint->getInfo(), endpoint->toString());
+ if(entry.observer)
+ {
+ entry.observer->attach();
+ }
+ }
+
_queue.push_back(entry);
notify();
}
@@ -234,11 +281,10 @@ IceInternal::EndpointHostResolver::run()
{
while(true)
{
- ResolveEntry resolve;
-
+ ResolveEntry r;
+ ThreadObserverPtr threadObserver;
{
Lock sync(*this);
-
while(!_destroyed && _queue.empty())
{
wait();
@@ -249,29 +295,70 @@ IceInternal::EndpointHostResolver::run()
break;
}
- resolve = _queue.front();
+ r = _queue.front();
_queue.pop_front();
+ threadObserver = _observer;
}
+ const ProtocolSupport protocol = _instance->protocolSupport();
try
{
- resolve.callback->connectors(
- resolve.endpoint->connectors(
- getAddresses(resolve.host, resolve.port, _instance->protocolSupport(), true)));
+ if(threadObserver)
+ {
+ threadObserver->stateChanged(ThreadStateIdle, ThreadStateInUseForMisc);
+ }
+
+ r.callback->connectors(r.endpoint->connectors(getAddresses(r.host, r.port, protocol, true)));
+
+ if(threadObserver)
+ {
+ threadObserver->stateChanged(ThreadStateInUseForMisc, ThreadStateIdle);
+ }
}
catch(const Ice::LocalException& ex)
{
- resolve.callback->exception(ex);
+ if(r.observer)
+ {
+ r.observer->failed(ex.ice_name());
+ }
+ r.callback->exception(ex);
+ }
+
+ if(r.observer)
+ {
+ r.observer->detach();
}
}
for(deque<ResolveEntry>::const_iterator p = _queue.begin(); p != _queue.end(); ++p)
{
- p->callback->exception(Ice::CommunicatorDestroyedException(__FILE__, __LINE__));
+ Ice::CommunicatorDestroyedException ex(__FILE__, __LINE__);
+ p->callback->exception(ex);
+ if(p->observer)
+ {
+ p->observer->failed(ex.ice_name());
+ p->observer->detach();
+ }
+
}
_queue.clear();
}
+void
+IceInternal::EndpointHostResolver::updateObserver()
+{
+ Lock sync(*this);
+ const ObserverResolverPtr& resolver = _instance->initializationData().observerResolver;
+ if(resolver)
+ {
+ _observer = resolver->getThreadObserver("Communicator", name(), ThreadStateIdle, _observer);
+ if(_observer)
+ {
+ _observer->attach();
+ }
+ }
+}
+
#else
IceInternal::EndpointHostResolver::EndpointHostResolver(const InstancePtr& instance)
@@ -299,4 +386,9 @@ IceInternal::EndpointHostResolver::run()
{
}
+void
+IceInternal::EndpointHostResolver::updateObserver()
+{
+}
+
#endif