summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ObjectCache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/ObjectCache.cpp')
-rw-r--r--cpp/src/IceGrid/ObjectCache.cpp218
1 files changed, 204 insertions, 14 deletions
diff --git a/cpp/src/IceGrid/ObjectCache.cpp b/cpp/src/IceGrid/ObjectCache.cpp
index d8d89d8583f..df39caaef91 100644
--- a/cpp/src/IceGrid/ObjectCache.cpp
+++ b/cpp/src/IceGrid/ObjectCache.cpp
@@ -7,6 +7,8 @@
//
// **********************************************************************
+#include <IceUtil/Random.h>
+
#include <Ice/Communicator.h>
#include <Ice/IdentityUtil.h>
#include <Ice/LoggerUtil.h>
@@ -18,8 +20,62 @@
using namespace std;
using namespace IceGrid;
-ObjectCache::ObjectCache(const Ice::CommunicatorPtr& communicator) :
- _communicator(communicator)
+pointer_to_unary_function<int, int> ObjectCache::_rand(IceUtil::random);
+
+namespace IceGrid
+{
+
+struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::ObjectPrx, float>&, bool>
+{
+ bool operator()(const pair<Ice::ObjectPrx, float>& lhs, const pair<Ice::ObjectPrx, float>& rhs)
+ {
+ return lhs.second < rhs.second;
+ }
+};
+
+};
+
+ObjectCache::TypeEntry::TypeEntry(ObjectCache& cache) : _cache(cache)
+{
+}
+
+void
+ObjectCache::TypeEntry::add(const Ice::ObjectPrx& obj)
+{
+ _objects.insert(lower_bound(_objects.begin(), _objects.end(), obj, ::Ice::proxyIdentityLess), obj);
+}
+
+bool
+ObjectCache::TypeEntry::remove(const Ice::ObjectPrx& obj)
+{
+ Ice::ObjectProxySeq::iterator q = lower_bound(_objects.begin(), _objects.end(), obj, ::Ice::proxyIdentityLess);
+ assert((*q)->ice_getIdentity() == obj->ice_getIdentity());
+ _objects.erase(q);
+ return _objects.empty();
+}
+
+void
+ObjectCache::TypeEntry::addAllocationRequest(const ObjectAllocationRequestPtr& request)
+{
+ _requests.push_back(request);
+ request->allocate(); // TODO: XXX: monitor request timeout if timeout != -1
+}
+
+void
+ObjectCache::TypeEntry::released(const ObjectEntryPtr& entry)
+{
+ while(!_requests.empty() && !entry->isAllocated())
+ {
+ if(entry->tryAllocate(_requests.front()))
+ {
+ _requests.pop_front();
+ }
+ }
+}
+
+ObjectCache::ObjectCache(const Ice::CommunicatorPtr& communicator, AdapterCache& adapterCache) :
+ _communicator(communicator),
+ _adapterCache(adapterCache)
{
}
@@ -44,12 +100,12 @@ ObjectCache::add(const string& app, const string& adapterId, const string& endpo
}
entry->set(app, info);
- map<string, set<Ice::Identity> >::iterator p = _types.find(entry->getType());
+ map<string, TypeEntry>::iterator p = _types.find(entry->getType());
if(p == _types.end())
{
- p = _types.insert(p, map<string, set<Ice::Identity> >::value_type(entry->getType(), set<Ice::Identity>()));
+ p = _types.insert(p, make_pair(entry->getType(), TypeEntry(*this)));
}
- p->second.insert(desc.id);
+ p->second.add(entry->getProxy());
if(_traceLevels && _traceLevels->object > 0)
{
@@ -78,11 +134,10 @@ ObjectCache::remove(const Ice::Identity& id)
ObjectEntryPtr entry = removeImpl(id);
assert(entry);
- map<string, set<Ice::Identity> >::iterator p = _types.find(entry->getType());
+ map<string, TypeEntry>::iterator p = _types.find(entry->getType());
assert(p != _types.end());
- p->second.erase(id);
- if(p->second.empty())
- {
+ if(p->second.remove(entry->getProxy()))
+ {
_types.erase(p);
}
@@ -95,19 +150,126 @@ ObjectCache::remove(const Ice::Identity& id)
return entry;
}
+void
+ObjectCache::allocateByType(const string& type, const ObjectAllocationRequestPtr& request)
+{
+ Lock sync(*this);
+ map<string, TypeEntry>::iterator p = _types.find(type);
+ if(p == _types.end())
+ {
+ request->response(0);
+ return;
+ }
+
+ Ice::ObjectProxySeq objects = p->second.getObjects();
+ random_shuffle(objects.begin(), objects.end(), _rand); // TODO: OPTIMIZE
+ for(Ice::ObjectProxySeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
+ {
+ //
+ // If tryAllocate() returns true, either the object was
+ // successfully allocated or the request canceled. In both
+ // cases, we're done!
+ //
+ if(getImpl((*p)->ice_getIdentity())->tryAllocate(request))
+ {
+ return;
+ }
+ }
+
+ if(request->getTimeout())
+ {
+ p->second.addAllocationRequest(request);
+ }
+ else
+ {
+ request->response(0);
+ }
+}
+
+void
+ObjectCache::allocateByTypeOnLeastLoadedNode(const string& type,
+ const ObjectAllocationRequestPtr& request,
+ LoadSample sample)
+{
+ Lock sync(*this);
+ map<string, TypeEntry>::iterator p = _types.find(type);
+ if(p == _types.end())
+ {
+ request->response(0);
+ return;
+ }
+
+ Ice::ObjectProxySeq objects = p->second.getObjects();
+ random_shuffle(objects.begin(), objects.end(), _rand); // TODO: OPTIMIZE
+ vector<pair<Ice::ObjectPrx, float> > objsWLoad;
+ objsWLoad.reserve(objects.size());
+ for(Ice::ObjectProxySeq::const_iterator o = objects.begin(); o != objects.end(); ++o)
+ {
+ float load = 1.0f;
+ if(!(*o)->ice_getAdapterId().empty())
+ {
+ try
+ {
+ load = _adapterCache.get((*o)->ice_getAdapterId())->getLeastLoadedNodeLoad(sample);
+ }
+ catch(const AdapterNotExistException&)
+ {
+ }
+ }
+ objsWLoad.push_back(make_pair(*o, load));
+ }
+ sort(objsWLoad.begin(), objsWLoad.end(), ObjectLoadCI());
+
+ for(vector<pair<Ice::ObjectPrx, float> >::const_iterator q = objsWLoad.begin(); q != objsWLoad.end(); ++q)
+ {
+ //
+ // If tryAllocate() returns true, either the object was
+ // successfully allocated or the request canceled. In both
+ // cases, we're done!
+ //
+ if(getImpl(q->first->ice_getIdentity())->tryAllocate(request))
+ {
+ return;
+ }
+ }
+
+ if(request->getTimeout())
+ {
+ p->second.addAllocationRequest(request);
+ }
+ else
+ {
+ request->response(0);
+ }
+}
+
+void
+ObjectCache::released(const ObjectEntryPtr& entry)
+{
+ Lock sync(*this);
+ map<string, TypeEntry>::iterator p = _types.find(entry->getType());
+ if(p == _types.end())
+ {
+ return;
+ }
+
+ p->second.released(entry);
+}
+
Ice::ObjectProxySeq
ObjectCache::getObjectsByType(const string& type)
{
Lock sync(*this);
Ice::ObjectProxySeq proxies;
- map<string, set<Ice::Identity> >::const_iterator p = _types.find(type);
+ map<string, TypeEntry>::const_iterator p = _types.find(type);
if(p == _types.end())
{
return proxies;
}
- for(set<Ice::Identity>::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
+ const Ice::ObjectProxySeq& objects = p->second.getObjects();
+ for(Ice::ObjectProxySeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
{
- ObjectEntryPtr entry = getImpl(*q);
+ ObjectEntryPtr entry = getImpl((*q)->ice_getIdentity());
if(!entry->allocatable())
{
proxies.push_back(entry->getProxy());
@@ -131,8 +293,9 @@ ObjectCache::getAll(const string& expression)
return infos;
}
-ObjectEntry::ObjectEntry(Cache<Ice::Identity, ObjectEntry>&, const Ice::Identity&) :
- Allocatable(false)
+ObjectEntry::ObjectEntry(Cache<Ice::Identity, ObjectEntry>& cache, const Ice::Identity&) :
+ Allocatable(false),
+ _cache(*dynamic_cast<ObjectCache*>(&cache))
{
}
@@ -173,3 +336,30 @@ ObjectEntry::canRemove()
{
return true;
}
+
+bool
+ObjectEntry::release(const SessionIPtr& session)
+{
+ if(Allocatable::release(session))
+ {
+ //
+ // Notify the cache that this entry was released. Note that we
+ // don't use the released callback here. This could lead to
+ // deadlocks since released() is called with the allocation
+ // mutex locked.
+ //
+ _cache.released(this);
+ return true;
+ }
+ return false;
+}
+
+void
+ObjectEntry::allocated()
+{
+}
+
+void
+ObjectEntry::released()
+{
+}