diff options
Diffstat (limited to 'cpp/src/IceGrid/ObjectCache.cpp')
-rw-r--r-- | cpp/src/IceGrid/ObjectCache.cpp | 218 |
1 files changed, 204 insertions, 14 deletions
diff --git a/cpp/src/IceGrid/ObjectCache.cpp b/cpp/src/IceGrid/ObjectCache.cpp index d8d89d8583f..df39caaef91 100644 --- a/cpp/src/IceGrid/ObjectCache.cpp +++ b/cpp/src/IceGrid/ObjectCache.cpp @@ -7,6 +7,8 @@ // // ********************************************************************** +#include <IceUtil/Random.h> + #include <Ice/Communicator.h> #include <Ice/IdentityUtil.h> #include <Ice/LoggerUtil.h> @@ -18,8 +20,62 @@ using namespace std; using namespace IceGrid; -ObjectCache::ObjectCache(const Ice::CommunicatorPtr& communicator) : - _communicator(communicator) +pointer_to_unary_function<int, int> ObjectCache::_rand(IceUtil::random); + +namespace IceGrid +{ + +struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::ObjectPrx, float>&, bool> +{ + bool operator()(const pair<Ice::ObjectPrx, float>& lhs, const pair<Ice::ObjectPrx, float>& rhs) + { + return lhs.second < rhs.second; + } +}; + +}; + +ObjectCache::TypeEntry::TypeEntry(ObjectCache& cache) : _cache(cache) +{ +} + +void +ObjectCache::TypeEntry::add(const Ice::ObjectPrx& obj) +{ + _objects.insert(lower_bound(_objects.begin(), _objects.end(), obj, ::Ice::proxyIdentityLess), obj); +} + +bool +ObjectCache::TypeEntry::remove(const Ice::ObjectPrx& obj) +{ + Ice::ObjectProxySeq::iterator q = lower_bound(_objects.begin(), _objects.end(), obj, ::Ice::proxyIdentityLess); + assert((*q)->ice_getIdentity() == obj->ice_getIdentity()); + _objects.erase(q); + return _objects.empty(); +} + +void +ObjectCache::TypeEntry::addAllocationRequest(const ObjectAllocationRequestPtr& request) +{ + _requests.push_back(request); + request->allocate(); // TODO: XXX: monitor request timeout if timeout != -1 +} + +void +ObjectCache::TypeEntry::released(const ObjectEntryPtr& entry) +{ + while(!_requests.empty() && !entry->isAllocated()) + { + if(entry->tryAllocate(_requests.front())) + { + _requests.pop_front(); + } + } +} + +ObjectCache::ObjectCache(const Ice::CommunicatorPtr& communicator, AdapterCache& adapterCache) : + _communicator(communicator), + _adapterCache(adapterCache) { } @@ -44,12 +100,12 @@ ObjectCache::add(const string& app, const string& adapterId, const string& endpo } entry->set(app, info); - map<string, set<Ice::Identity> >::iterator p = _types.find(entry->getType()); + map<string, TypeEntry>::iterator p = _types.find(entry->getType()); if(p == _types.end()) { - p = _types.insert(p, map<string, set<Ice::Identity> >::value_type(entry->getType(), set<Ice::Identity>())); + p = _types.insert(p, make_pair(entry->getType(), TypeEntry(*this))); } - p->second.insert(desc.id); + p->second.add(entry->getProxy()); if(_traceLevels && _traceLevels->object > 0) { @@ -78,11 +134,10 @@ ObjectCache::remove(const Ice::Identity& id) ObjectEntryPtr entry = removeImpl(id); assert(entry); - map<string, set<Ice::Identity> >::iterator p = _types.find(entry->getType()); + map<string, TypeEntry>::iterator p = _types.find(entry->getType()); assert(p != _types.end()); - p->second.erase(id); - if(p->second.empty()) - { + if(p->second.remove(entry->getProxy())) + { _types.erase(p); } @@ -95,19 +150,126 @@ ObjectCache::remove(const Ice::Identity& id) return entry; } +void +ObjectCache::allocateByType(const string& type, const ObjectAllocationRequestPtr& request) +{ + Lock sync(*this); + map<string, TypeEntry>::iterator p = _types.find(type); + if(p == _types.end()) + { + request->response(0); + return; + } + + Ice::ObjectProxySeq objects = p->second.getObjects(); + random_shuffle(objects.begin(), objects.end(), _rand); // TODO: OPTIMIZE + for(Ice::ObjectProxySeq::const_iterator p = objects.begin(); p != objects.end(); ++p) + { + // + // If tryAllocate() returns true, either the object was + // successfully allocated or the request canceled. In both + // cases, we're done! + // + if(getImpl((*p)->ice_getIdentity())->tryAllocate(request)) + { + return; + } + } + + if(request->getTimeout()) + { + p->second.addAllocationRequest(request); + } + else + { + request->response(0); + } +} + +void +ObjectCache::allocateByTypeOnLeastLoadedNode(const string& type, + const ObjectAllocationRequestPtr& request, + LoadSample sample) +{ + Lock sync(*this); + map<string, TypeEntry>::iterator p = _types.find(type); + if(p == _types.end()) + { + request->response(0); + return; + } + + Ice::ObjectProxySeq objects = p->second.getObjects(); + random_shuffle(objects.begin(), objects.end(), _rand); // TODO: OPTIMIZE + vector<pair<Ice::ObjectPrx, float> > objsWLoad; + objsWLoad.reserve(objects.size()); + for(Ice::ObjectProxySeq::const_iterator o = objects.begin(); o != objects.end(); ++o) + { + float load = 1.0f; + if(!(*o)->ice_getAdapterId().empty()) + { + try + { + load = _adapterCache.get((*o)->ice_getAdapterId())->getLeastLoadedNodeLoad(sample); + } + catch(const AdapterNotExistException&) + { + } + } + objsWLoad.push_back(make_pair(*o, load)); + } + sort(objsWLoad.begin(), objsWLoad.end(), ObjectLoadCI()); + + for(vector<pair<Ice::ObjectPrx, float> >::const_iterator q = objsWLoad.begin(); q != objsWLoad.end(); ++q) + { + // + // If tryAllocate() returns true, either the object was + // successfully allocated or the request canceled. In both + // cases, we're done! + // + if(getImpl(q->first->ice_getIdentity())->tryAllocate(request)) + { + return; + } + } + + if(request->getTimeout()) + { + p->second.addAllocationRequest(request); + } + else + { + request->response(0); + } +} + +void +ObjectCache::released(const ObjectEntryPtr& entry) +{ + Lock sync(*this); + map<string, TypeEntry>::iterator p = _types.find(entry->getType()); + if(p == _types.end()) + { + return; + } + + p->second.released(entry); +} + Ice::ObjectProxySeq ObjectCache::getObjectsByType(const string& type) { Lock sync(*this); Ice::ObjectProxySeq proxies; - map<string, set<Ice::Identity> >::const_iterator p = _types.find(type); + map<string, TypeEntry>::const_iterator p = _types.find(type); if(p == _types.end()) { return proxies; } - for(set<Ice::Identity>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + const Ice::ObjectProxySeq& objects = p->second.getObjects(); + for(Ice::ObjectProxySeq::const_iterator q = objects.begin(); q != objects.end(); ++q) { - ObjectEntryPtr entry = getImpl(*q); + ObjectEntryPtr entry = getImpl((*q)->ice_getIdentity()); if(!entry->allocatable()) { proxies.push_back(entry->getProxy()); @@ -131,8 +293,9 @@ ObjectCache::getAll(const string& expression) return infos; } -ObjectEntry::ObjectEntry(Cache<Ice::Identity, ObjectEntry>&, const Ice::Identity&) : - Allocatable(false) +ObjectEntry::ObjectEntry(Cache<Ice::Identity, ObjectEntry>& cache, const Ice::Identity&) : + Allocatable(false), + _cache(*dynamic_cast<ObjectCache*>(&cache)) { } @@ -173,3 +336,30 @@ ObjectEntry::canRemove() { return true; } + +bool +ObjectEntry::release(const SessionIPtr& session) +{ + if(Allocatable::release(session)) + { + // + // Notify the cache that this entry was released. Note that we + // don't use the released callback here. This could lead to + // deadlocks since released() is called with the allocation + // mutex locked. + // + _cache.released(this); + return true; + } + return false; +} + +void +ObjectEntry::allocated() +{ +} + +void +ObjectEntry::released() +{ +} |