summaryrefslogtreecommitdiff
path: root/cpp/src/IceDiscovery/LookupI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceDiscovery/LookupI.cpp')
-rw-r--r--cpp/src/IceDiscovery/LookupI.cpp303
1 files changed, 303 insertions, 0 deletions
diff --git a/cpp/src/IceDiscovery/LookupI.cpp b/cpp/src/IceDiscovery/LookupI.cpp
new file mode 100644
index 00000000000..01e33ca1a2b
--- /dev/null
+++ b/cpp/src/IceDiscovery/LookupI.cpp
@@ -0,0 +1,303 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/Connection.h>
+#include <Ice/ObjectAdapter.h>
+#include <Ice/Communicator.h>
+#include <Ice/LocalException.h>
+
+#include <IceDiscovery/LookupI.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceDiscovery;
+
+IceDiscovery::Request::Request(LookupI* lookup, int retryCount) : _lookup(lookup), _nRetry(retryCount)
+{
+}
+
+bool
+IceDiscovery::Request::retry()
+{
+ return --_nRetry >= 0;
+}
+
+bool
+AdapterRequest::retry()
+{
+ return _proxies.empty() && --_nRetry >= 0;
+}
+
+bool
+AdapterRequest::response(const Ice::ObjectPrx& proxy, bool isReplicaGroup)
+{
+ if(isReplicaGroup)
+ {
+ if(_latency == IceUtil::Time())
+ {
+ _latency = (IceUtil::Time::now() - _start) * _lookup->latencyMultiplier();
+ _lookup->timer()->cancel(this);
+ _lookup->timer()->schedule(this, _latency);
+ }
+ _proxies.push_back(proxy);
+ return false;
+ }
+ finished(proxy);
+ return true;
+}
+
+void
+AdapterRequest::finished(const Ice::ObjectPrx& proxy)
+{
+ if(proxy || _proxies.empty())
+ {
+ RequestT<std::string, Ice::AMD_Locator_findAdapterByIdPtr>::finished(proxy);
+ return;
+ }
+ else if(_proxies.size() == 1)
+ {
+ RequestT<std::string, Ice::AMD_Locator_findAdapterByIdPtr>::finished(_proxies[0]);
+ return;
+ }
+
+ Ice::EndpointSeq endpoints;
+ Ice::ObjectPrx prx;
+ for(vector<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
+ {
+ if(!prx)
+ {
+ prx = *p;
+ }
+ Ice::EndpointSeq endpts = (*p)->ice_getEndpoints();
+ copy(endpts.begin(), endpts.end(), back_inserter(endpoints));
+ }
+ RequestT<std::string, Ice::AMD_Locator_findAdapterByIdPtr>::finished(prx->ice_endpoints(endpoints));
+}
+
+void
+AdapterRequest::runTimerTask()
+{
+ _lookup->adapterRequestTimedOut(this);
+}
+
+void
+ObjectRequest::response(const Ice::ObjectPrx& proxy)
+{
+ finished(proxy);
+}
+
+void
+ObjectRequest::runTimerTask()
+{
+ _lookup->objectRequestTimedOut(this);
+}
+
+LookupI::LookupI(const LocatorRegistryIPtr& registry, const LookupPrx& lookup, const Ice::PropertiesPtr& properties) :
+ _registry(registry),
+ _lookup(lookup),
+ _timeout(IceUtil::Time::milliSeconds(properties->getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300))),
+ _retryCount(properties->getPropertyAsIntWithDefault("IceDiscovery.RetryCount", 3)),
+ _latencyMultiplier(properties->getPropertyAsIntWithDefault("IceDiscovery.LatencyMultiplier", 1)),
+ _domainId(properties->getProperty("IceDiscovery.DomainId")),
+ _timer(new IceUtil::Timer())
+{
+}
+
+void
+LookupI::setLookupReply(const LookupReplyPrx& lookupReply)
+{
+ _lookupReply = lookupReply;
+}
+
+void
+LookupI::findObjectById(const string& domainId, const Ice::Identity& id, const IceDiscovery::LookupReplyPrx& reply,
+ const Ice::Current& c)
+{
+ if(domainId != _domainId)
+ {
+ return; // Ignore.
+ }
+
+ Ice::ObjectPrx proxy = _registry->findObject(id);
+ if(proxy)
+ {
+ //
+ // Reply to the mulicast request using the given proxy.
+ //
+ getLookupReply(reply, c)->begin_foundObjectById(id, proxy);
+ }
+}
+
+void
+LookupI::findAdapterById(const string& domainId, const std::string& adapterId,
+ const IceDiscovery::LookupReplyPrx& reply, const Ice::Current& c)
+{
+ if(domainId != _domainId)
+ {
+ return; // Ignore.
+ }
+
+ bool isReplicaGroup;
+ Ice::ObjectPrx proxy = _registry->findAdapter(adapterId, isReplicaGroup);
+ if(proxy)
+ {
+ //
+ // Reply to the multicast request using the given proxy.
+ //
+ getLookupReply(reply, c)->begin_foundAdapterById(adapterId, proxy, isReplicaGroup);
+ }
+}
+
+void
+LookupI::findObject(const Ice::AMD_Locator_findObjectByIdPtr& cb, const Ice::Identity& id)
+{
+ Lock sync(*this);
+ map<Ice::Identity, ObjectRequestPtr>::const_iterator p = _objectRequests.find(id);
+ if(p == _objectRequests.end())
+ {
+ p = _objectRequests.insert(make_pair(id, new ObjectRequest(this, id, _retryCount))).first;
+ }
+
+ if(p->second->addCallback(cb))
+ {
+ _lookup->findObjectById(_domainId, id, _lookupReply);
+ _timer->schedule(p->second, _timeout);
+ }
+}
+
+void
+LookupI::findAdapter(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const std::string& adapterId)
+{
+ Lock sync(*this);
+ map<string, AdapterRequestPtr>::const_iterator p = _adapterRequests.find(adapterId);
+ if(p == _adapterRequests.end())
+ {
+ p = _adapterRequests.insert(make_pair(adapterId, new AdapterRequest(this, adapterId, _retryCount))).first;
+ }
+
+ if(p->second->addCallback(cb))
+ {
+ _lookup->findAdapterById(_domainId, adapterId, _lookupReply);
+ _timer->schedule(p->second, _timeout);
+ }
+}
+
+void
+LookupI::foundObject(const Ice::Identity& id, const Ice::ObjectPrx& proxy)
+{
+ Lock sync(*this);
+ map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id);
+ if(p == _objectRequests.end())
+ {
+ return;
+ }
+
+ p->second->response(proxy);
+ _timer->cancel(p->second);
+ _objectRequests.erase(p);
+}
+
+void
+LookupI::foundAdapter(const std::string& adapterId, const Ice::ObjectPrx& proxy, bool isReplicaGroup)
+{
+ Lock sync(*this);
+ map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId);
+ if(p == _adapterRequests.end())
+ {
+ return;
+ }
+
+ if(p->second->response(proxy, isReplicaGroup))
+ {
+ _timer->cancel(p->second);
+ _adapterRequests.erase(p);
+ }
+}
+
+void
+LookupI::objectRequestTimedOut(const ObjectRequestPtr& request)
+{
+ Lock sync(*this);
+ map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(request->getId());
+ if(p == _objectRequests.end() || p->second.get() != request.get())
+ {
+ return;
+ }
+
+ if(request->retry())
+ {
+ _lookup->findObjectById(_domainId, request->getId(), _lookupReply);
+ _timer->schedule(p->second, _timeout);
+ }
+ else
+ {
+ request->finished(0);
+ _objectRequests.erase(p);
+ _timer->cancel(request);
+ }
+}
+
+void
+LookupI::adapterRequestTimedOut(const AdapterRequestPtr& request)
+{
+ Lock sync(*this);
+ map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(request->getId());
+ if(p == _adapterRequests.end() || p->second.get() != request.get())
+ {
+ return;
+ }
+
+ if(request->retry())
+ {
+ _lookup->findAdapterById(_domainId, request->getId(), _lookupReply);
+ _timer->schedule(p->second, _timeout);
+ }
+ else
+ {
+ request->finished(0);
+ _adapterRequests.erase(p);
+ _timer->cancel(request);
+ }
+}
+
+LookupReplyPrx
+LookupI::getLookupReply(const LookupReplyPrx& reply, const Ice::Current& current) const
+{
+ // Ice::UDPConnectionInfoPtr info = Ice::UDPConnectionInfoPtr::dynamicCast(current.con->getInfo());
+ // if(info)
+ // {
+ // Ice::CommunicatorPtr com = current.adapter->getCommunicator();
+ // ostringstream os;
+ // os << "\"" << com->identityToString(reply->ice_getIdentity()) << "\"";
+ // os << ":udp -h " << info->remoteAddress << " -p " << info->remotePort;
+ // return LookupReplyPrx::uncheckedCast(com->stringToProxy(os.str())->ice_datagram());
+ // }
+ // else
+ {
+ return reply;
+ }
+}
+
+LookupReplyI::LookupReplyI(const LookupIPtr& lookup) : _lookup(lookup)
+{
+}
+
+void
+LookupReplyI::foundObjectById(const Ice::Identity& id, const Ice::ObjectPrx& proxy, const Ice::Current&)
+{
+ _lookup->foundObject(id, proxy);
+}
+
+void
+LookupReplyI::foundAdapterById(const std::string& adapterId, const Ice::ObjectPrx& proxy, bool isReplicaGroup,
+ const Ice::Current&)
+{
+ _lookup->foundAdapter(adapterId, proxy, isReplicaGroup);
+}
+